Hi All, I wrote spark streaming program with stateful transformation. It seems like my spark streaming application is doing computation correctly with check pointing. But i terminate my program and i start it again, it's not reading the previous checkpointing data and staring from the beginning. Is it the expected behaviour?
Do i need to change anything in my program so that it will remember the previous data and start computation from there? Thanks in advance. For reference my program: def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("HBaseStream") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999) ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir") inputStream.print(1) val parsedStream = inputStream .map(line => { val splitLines = line.split(",") (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) }) import breeze.linalg.{DenseVector => BDV} import scala.util.Try val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey( (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { prev.map(_ +: current).orElse(Some(current)) .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) }) state.checkpoint(Duration(10000)) state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) // Start the computation ssc.start() // Wait for the computation to terminate ssc.awaitTermination() } } Regards, ~Vinti