Hi All,

I wrote spark streaming program with stateful transformation.
It seems like my spark streaming application is doing computation correctly
with check pointing.
But i terminate my program and i start it again, it's not reading the
previous checkpointing data and staring from the beginning. Is it the
expected behaviour?

Do i need to change anything in my program so that it will remember the
previous data and start computation from there?

Thanks in advance.

For reference my program:


  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("HBaseStream")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val inputStream = ssc.socketTextStream("ttsv-vccp-01.juniper.net", 9999)
    
ssc.checkpoint("hdfs://ttsv-lab-vmdb-01.englab.juniper.net:8020/user/spark/checkpoints_dir")
    inputStream.print(1)
    val parsedStream = inputStream
      .map(line => {
        val splitLines = line.split(",")
        (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
      })
    import breeze.linalg.{DenseVector => BDV}
    import scala.util.Try

    val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
      (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
        prev.map(_ +: current).orElse(Some(current))
          .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
      })
    state.checkpoint(Duration(10000))
    state.foreachRDD(rdd => rdd.foreach(Blaher.blah))

    // Start the computation
    ssc.start()
    // Wait for the computation to terminate
    ssc.awaitTermination()

  }
}


Regards,

~Vinti

Reply via email to