Can you clarify the scenario:

 val ssc = new StreamingContext(sparkConf, Seconds(10))
 ssc.checkpoint(checkpointDirectory)

 val stream = KafkaUtils.createStream(...)
 val wordCounts = lines.flatMap(_.split(" ")).map(x => (x, 1L))

 val wordDstream= wordCounts.updateStateByKey[Int](updateFunc)

wordDstream.foreachRDD(

   rdd=>rdd.foreachPartition( //sync state to external store//)
)

​

My impression is that during recovery from checkpoint, your wordDstream
would be in the state that it was before the crash +1 batch interval
forward when you get to the foreachRDD part -- even if re-creating the
pre-crash RDD is really slow. So if your driver goes down at 10:20 and you
restart at 10:30, I thought at the time of the DB write wordDstream would
have exactly the state of 10:20 + 10seconds worth of aggregated stream data?


I don't really understand what you mean by "Upon metadata checkpoint
recovery (before the data checkpoint occurs)" but it sounds like you're
observing the same DB write happening twice?

I don't have any advice for you but I am interested in understanding better
what happens in the recovery scenario so just trying to clarify what you
observe.


On Thu, Aug 28, 2014 at 6:42 AM, GADV <giulio_devec...@yahoo.com> wrote:

> Not sure if this make sense, but maybe would be nice to have a kind of
> "flag"
> available within the code that tells me if I'm running in a "normal"
> situation or during a recovery.
> To better explain this, let's consider the following scenario:
> I am processing data, let's say from a Kafka streaming, and I am updating a
> database based on the computations. During the recovery I don't want to
> update again the database (for many reasons, let's just assume that) but I
> want my system to be in the same status as before, thus I would like to
> know
> if my code is running for the first time or during a recovery so I can
> avoid
> to update the database again.
> More generally I want to know this in case I'm interacting with external
> entities.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13009.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to