You need to create the streaming context using an existing checkpoint for it to work
See sample here On Sat, 27 Feb 2016, 20:28 Vinti Maheshwari, <vinti.u...@gmail.com> wrote: > Hi All, > > I wrote spark streaming program with stateful transformation. > It seems like my spark streaming application is doing computation > correctly with check pointing. > But i terminate my program and i start it again, it's not reading the > previous checkpointing data and staring from the beginning. Is it the > expected behaviour? > > Do i need to change anything in my program so that it will remember the > previous data and start computation from there? > > Thanks in advance. > > For reference my program: > > > def main(args: Array[String]): Unit = { > val conf = new SparkConf().setAppName("HBaseStream") > val sc = new SparkContext(conf) > val ssc = new StreamingContext(sc, Seconds(5)) > val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999) > > ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir") > inputStream.print(1) > val parsedStream = inputStream > .map(line => { > val splitLines = line.split(",") > (splitLines(1), splitLines.slice(2, > splitLines.length).map((_.trim.toLong))) > }) > import breeze.linalg.{DenseVector => BDV} > import scala.util.Try > > val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( > (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { > prev.map(_ +: current).orElse(Some(current)) > .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) > }) > state.checkpoint(Duration(10000)) > state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) > > // Start the computation > ssc.start() > // Wait for the computation to terminate > ssc.awaitTermination() > > } > } > > > Regards, > > ~Vinti > >