Re: [spark streaming] checkpoint location feature for batch processing
Replied inline: On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson wrote: > Thank you, so that would mean spark gets the current latest offset(s) when > the trigger fires and then process all available messages in the topic upto > and including that offset as long as maxOffsetsPerTrigger is the default of > None (or large enought to handle all available messages). > Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still affecting even Trigger.Once is used I guess. > > I think the word micro-batch confused me (more like mega-batch in some > cases). It makes sense though, this makes Trigger.Once a fixed interval > trigger that's only fired once and not repeatedly. > "micro" is relative - though Spark by default processes all available inputs per batch, in most cases you'll want to make the batch size (interval) as small as possible, as it defines the latency of the output. Trigger.Once is an unusual case in streaming workload - that's more alike continuous execution of "batch". I refer "continuous" as picking up latest context which is the characteristic of streaming query, hence hybrid one. > > > On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim > wrote: > >> If I understand correctly, Trigger.once executes only one micro-batch and >> terminates, that's all. Your understanding of structured streaming applies >> there as well. >> >> It's like a hybrid approach as bringing incremental processing from >> micro-batch but having processing interval as batch. That said, while it >> enables to get both sides of benefits, it's basically structured streaming, >> inheriting all the limitations on the structured streaming, compared to the >> batch query. >> >> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) - >> Trigger.once will "ignore" the read limit per micro-batch on data source >> (like maxOffsetsPerTrigger) and process all available input as possible. >> (Data sources should migrate to the new API to take effect, but works for >> built-in data sources like file and Kafka.) >> >> 1. https://issues.apache.org/jira/browse/SPARK-30669 >> >> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성: >> >>> I've always had a question about Trigger.Once that I never got around to >>> ask or test for myself. If you have a 24/7 stream to a Kafka topic. >>> >>> Will Trigger.Once get the last offset(s) when it starts and then quit >>> once it hits this offset(s) or will the job run until no new messages is >>> added to the topic for a particular amount of time? >>> >>> br, >>> >>> Magnus >>> >>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: >>> Hi Rishi, That is exactly why Trigger.Once was created for Structured Streaming. The way we look at streaming is that it doesn't have to be always real time, or 24-7 always on. We see streaming as a workflow that you have to repeat indefinitely. See this blog post for more details! https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html Best, Burak On Fri, May 1, 2020 at 2:55 PM Rishi Shah wrote: > Hi All, > > I recently started playing with spark streaming, and checkpoint > location feature looks very promising. I wonder if anyone has an opinion > about using spark streaming with checkpoint location option as a slow > batch > processing solution. What would be the pros and cons of utilizing > streaming > with checkpoint location feature to achieve fault tolerance in batch > processing application? > > -- > Regards, > > Rishi Shah >
Re: [spark streaming] checkpoint location feature for batch processing
Thank you, so that would mean spark gets the current latest offset(s) when the trigger fires and then process all available messages in the topic upto and including that offset as long as maxOffsetsPerTrigger is the default of None (or large enought to handle all available messages). I think the word micro-batch confused me (more like mega-batch in some cases). It makes sense though, this makes Trigger.Once a fixed interval trigger that's only fired once and not repeatedly. On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim wrote: > If I understand correctly, Trigger.once executes only one micro-batch and > terminates, that's all. Your understanding of structured streaming applies > there as well. > > It's like a hybrid approach as bringing incremental processing from > micro-batch but having processing interval as batch. That said, while it > enables to get both sides of benefits, it's basically structured streaming, > inheriting all the limitations on the structured streaming, compared to the > batch query. > > Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) - > Trigger.once will "ignore" the read limit per micro-batch on data source > (like maxOffsetsPerTrigger) and process all available input as possible. > (Data sources should migrate to the new API to take effect, but works for > built-in data sources like file and Kafka.) > > 1. https://issues.apache.org/jira/browse/SPARK-30669 > > 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성: > >> I've always had a question about Trigger.Once that I never got around to >> ask or test for myself. If you have a 24/7 stream to a Kafka topic. >> >> Will Trigger.Once get the last offset(s) when it starts and then quit >> once it hits this offset(s) or will the job run until no new messages is >> added to the topic for a particular amount of time? >> >> br, >> >> Magnus >> >> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: >> >>> Hi Rishi, >>> >>> That is exactly why Trigger.Once was created for Structured Streaming. >>> The way we look at streaming is that it doesn't have to be always real >>> time, or 24-7 always on. We see streaming as a workflow that you have to >>> repeat indefinitely. See this blog post for more details! >>> >>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html >>> >>> Best, >>> Burak >>> >>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah >>> wrote: >>> Hi All, I recently started playing with spark streaming, and checkpoint location feature looks very promising. I wonder if anyone has an opinion about using spark streaming with checkpoint location option as a slow batch processing solution. What would be the pros and cons of utilizing streaming with checkpoint location feature to achieve fault tolerance in batch processing application? -- Regards, Rishi Shah >>>
Re: [spark streaming] checkpoint location feature for batch processing
If I understand correctly, Trigger.once executes only one micro-batch and terminates, that's all. Your understanding of structured streaming applies there as well. It's like a hybrid approach as bringing incremental processing from micro-batch but having processing interval as batch. That said, while it enables to get both sides of benefits, it's basically structured streaming, inheriting all the limitations on the structured streaming, compared to the batch query. Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) - Trigger.once will "ignore" the read limit per micro-batch on data source (like maxOffsetsPerTrigger) and process all available input as possible. (Data sources should migrate to the new API to take effect, but works for built-in data sources like file and Kafka.) 1. https://issues.apache.org/jira/browse/SPARK-30669 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성: > I've always had a question about Trigger.Once that I never got around to > ask or test for myself. If you have a 24/7 stream to a Kafka topic. > > Will Trigger.Once get the last offset(s) when it starts and then quit once > it hits this offset(s) or will the job run until no new messages is added > to the topic for a particular amount of time? > > br, > > Magnus > > On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: > >> Hi Rishi, >> >> That is exactly why Trigger.Once was created for Structured Streaming. >> The way we look at streaming is that it doesn't have to be always real >> time, or 24-7 always on. We see streaming as a workflow that you have to >> repeat indefinitely. See this blog post for more details! >> >> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html >> >> Best, >> Burak >> >> On Fri, May 1, 2020 at 2:55 PM Rishi Shah >> wrote: >> >>> Hi All, >>> >>> I recently started playing with spark streaming, and checkpoint location >>> feature looks very promising. I wonder if anyone has an opinion about using >>> spark streaming with checkpoint location option as a slow batch processing >>> solution. What would be the pros and cons of utilizing streaming with >>> checkpoint location feature to achieve fault tolerance in batch processing >>> application? >>> >>> -- >>> Regards, >>> >>> Rishi Shah >>> >>
Re: [spark streaming] checkpoint location feature for batch processing
I've always had a question about Trigger.Once that I never got around to ask or test for myself. If you have a 24/7 stream to a Kafka topic. Will Trigger.Once get the last offset(s) when it starts and then quit once it hits this offset(s) or will the job run until no new messages is added to the topic for a particular amount of time? br, Magnus On Sat, May 2, 2020 at 1:22 AM Burak Yavuz wrote: > Hi Rishi, > > That is exactly why Trigger.Once was created for Structured Streaming. The > way we look at streaming is that it doesn't have to be always real time, or > 24-7 always on. We see streaming as a workflow that you have to repeat > indefinitely. See this blog post for more details! > > https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html > > Best, > Burak > > On Fri, May 1, 2020 at 2:55 PM Rishi Shah > wrote: > >> Hi All, >> >> I recently started playing with spark streaming, and checkpoint location >> feature looks very promising. I wonder if anyone has an opinion about using >> spark streaming with checkpoint location option as a slow batch processing >> solution. What would be the pros and cons of utilizing streaming with >> checkpoint location feature to achieve fault tolerance in batch processing >> application? >> >> -- >> Regards, >> >> Rishi Shah >> >
Re: [spark streaming] checkpoint location feature for batch processing
Thanks Burak! Appreciate it. This makes sense. How do you suggest we make sure resulting data doesn't produce tiny files? If we are not on databricks yet and can not leverage delta lake features? Also checkpointing feature, do you have active blog/article I can take a look at to try out an example? On Fri, May 1, 2020 at 7:22 PM Burak Yavuz wrote: > Hi Rishi, > > That is exactly why Trigger.Once was created for Structured Streaming. The > way we look at streaming is that it doesn't have to be always real time, or > 24-7 always on. We see streaming as a workflow that you have to repeat > indefinitely. See this blog post for more details! > > https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html > > Best, > Burak > > On Fri, May 1, 2020 at 2:55 PM Rishi Shah > wrote: > >> Hi All, >> >> I recently started playing with spark streaming, and checkpoint location >> feature looks very promising. I wonder if anyone has an opinion about using >> spark streaming with checkpoint location option as a slow batch processing >> solution. What would be the pros and cons of utilizing streaming with >> checkpoint location feature to achieve fault tolerance in batch processing >> application? >> >> -- >> Regards, >> >> Rishi Shah >> > -- Regards, Rishi Shah
Re: [spark streaming] checkpoint location feature for batch processing
Hi Rishi, That is exactly why Trigger.Once was created for Structured Streaming. The way we look at streaming is that it doesn't have to be always real time, or 24-7 always on. We see streaming as a workflow that you have to repeat indefinitely. See this blog post for more details! https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html Best, Burak On Fri, May 1, 2020 at 2:55 PM Rishi Shah wrote: > Hi All, > > I recently started playing with spark streaming, and checkpoint location > feature looks very promising. I wonder if anyone has an opinion about using > spark streaming with checkpoint location option as a slow batch processing > solution. What would be the pros and cons of utilizing streaming with > checkpoint location feature to achieve fault tolerance in batch processing > application? > > -- > Regards, > > Rishi Shah >
[spark streaming] checkpoint location feature for batch processing
Hi All, I recently started playing with spark streaming, and checkpoint location feature looks very promising. I wonder if anyone has an opinion about using spark streaming with checkpoint location option as a slow batch processing solution. What would be the pros and cons of utilizing streaming with checkpoint location feature to achieve fault tolerance in batch processing application? -- Regards, Rishi Shah
Spark Streaming: Checkpoint, Recovery and Idempotency
Hello, I am trying to understand the content of a checkpoint and corresponding recovery. *My understanding of Spark Checkpointing: * If you have really long DAGs and your spark cluster fails, checkpointing helps by persisting intermediate state e.g. to HDFS. So, a DAG of 50 transformations can be reduced to 4-5 transformations with the help of checkpointing. It breaks the DAG though. *Checkpointing in Streaming * My Spark Streaming job has a microbatch of 5 seconds. As I understand, a new job is submitted every 5 secs on the Eventloop that invokes the JobGenerator to generate the RDD DAG for the new microbatch from the DStreamGraph, while the receiver in the meantime keeps collecting the data for the next new microbatch for the next cycle. If I enable checkpointing, as I understand, it will periodically keep checkpointing the "current state". *Question: * What is this "state"? Is this the combination of the base RDD and the state of the operators/transformations of the DAG for the present microbatch only? So I have the following: /ubatch 0 at T=0 > SUCCESS ubatch 1 at T=5 > SUCCESS ubatch 2 at T=10 ---> SUCCESS > Checkpointing kicks in now at T=12 ubatch 3 at T=15 ---> SUCCESS ubatch 4 at T=20 > Spark Cluster DOWN at T=23 => ubatch 4 FAILS!!! ... > Spark Cluster is restarted at *T=100*/ What specifically goes and sits on the disk as a result of checkpointing at T=12? Will it just store the present state of operators of the DAG for ubatch 2? a. If yes, then during recovery at T=100, the last checkpoint available is at T=12. What happens to the ubatch 3 at T=15 which was already processed successfully. Does the application reprocess ubatch 3 and handle idempotency here? If yes, do we go to the streaming source e.g. Kafka and rewind the offset to be able to replay the contents starting from the ubatch 3? b. If no, then what exactly goes into the checkpoint directory at T=12? https://stackoverflow.com/questions/56362347/spark-checkpointing-content-recovery-and-idempotency Regards -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Streaming checkpoint
Hi, I have written spark streaming job to use the checkpoint. I have stopped the streaming job for 5 days and then restart it today. I have encountered weird issue where it shows as zero records for all cycles till date. is it causing data loss? [image: Inline image 1] Thanks, Asmath
Spark 2.1.2 Spark Streaming checkpoint interval not respected
Hi, In the following example using mapWithState, I set checkpoint interval to 1 minute. From the log, Spark stills write to the checkpoint directory every second. Would be appreciated if someone can point out what I have done wrong. object MapWithStateDemo { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: MapWithStateDemo ") System.exit(1) } val sparkConf = new SparkConf().setAppName("MapWithStateDemo") .setIfMissing("spark.master","local[*]") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) // Initial state RDD for mapWithState operation val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using mapWithState // This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] = wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD)) stateDstream.checkpoint(Minutes(1L)) stateDstream.print() val targetDir = new File(getClass.getResource("/").toURI).getParentFile.getParentFile val checkpointDir = targetDir + "/checkpoint" ssc.checkpoint(checkpointDir) ssc.start() ssc.awaitTermination() } } Thanks in advance for any assistance ! Shing
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
In either case, end to end exactly once guarantee can only be ensured only if the output sink is updated transactionally. The engine has to re execute data on failure. Exactly once guarantee means that the external storage is updated as if each data record was computed exactly once. That's why you need to update them transactionally to handle possible recomputations. This is true for both spark streaming and structured streaming. Hope this helps. On Jun 6, 2017 5:56 AM, "ALunar Beach" <alunarbe...@gmail.com> wrote: > Thanks TD. > In pre-structured streaming, exactly once guarantee on input is not > guaranteed. is it? > > On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com > > wrote: > >> This is the expected behavior. There are some confusing corner cases. >> If you are starting to play with Spark Streaming, i highly recommend >> learning Structured Streaming >> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> >> instead. >> >> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com> >> wrote: >> >>> I am using Spark Streaming Checkpoint and Kafka Direct Stream. >>> It uses a 30 sec batch duration and normally the job is successful in >>> 15-20 sec. >>> >>> If the spark application fails after the successful completion >>> (149668428ms in the log below) and restarts, it's duplicating the last >>> batch again. >>> >>> Is this the expected behavior? I was expecting this to start a new batch >>> window. >>> >>> >>> Here are some logs: >>> >>> Last successful run: >>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time >>> 149668428 ms (execution: 0.029 s) >>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list >>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 >>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time >>> 149668428 ms to writer queue >>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time >>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide >>> aProjects/Spark2Example/ckpt/checkpoint-149668428' >>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time >>> 149668428 ms saved to file >>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', >>> took 4032 bytes and 9 ms* >>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time >>> 149668428 ms >>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time >>> 149668428 ms >>> >>> After the restart, >>> >>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct >>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time >>> 149668428 ms [(my_test,0,2000,2000)] >>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data >>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 >>> batches): 149668428 ms, 149668431 ms, 149668434 ms, >>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >>> 149668449 ms, 149668452 ms, 149668455 ms* >>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 >>> batches): * >>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 >>> batches): *149668428 ms, 149668431 ms, 149668434 ms, >>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >>> 149668449 ms, 149668452 ms, 149668455 ms >>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms >>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job >>> 149668428 ms.0 from job set of time 149668428 ms >>> >>> >>> >>> -- >>> View this message in context: Fwd: Spark Streaming Checkpoint and >>> Exactly Once Guarantee on Kafka Direct Stream >>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> >>> Sent from the Apache Spark User List mailing list archive >>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >>> >> >> >
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
Thanks TD. In pre-structured streaming, exactly once guarantee on input is not guaranteed. is it? On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > This is the expected behavior. There are some confusing corner cases. > If you are starting to play with Spark Streaming, i highly recommend > learning Structured Streaming > <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> > instead. > > On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com> > wrote: > >> I am using Spark Streaming Checkpoint and Kafka Direct Stream. >> It uses a 30 sec batch duration and normally the job is successful in >> 15-20 sec. >> >> If the spark application fails after the successful completion >> (149668428ms in the log below) and restarts, it's duplicating the last >> batch again. >> >> Is this the expected behavior? I was expecting this to start a new batch >> window. >> >> >> Here are some logs: >> >> Last successful run: >> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time >> 149668428 ms (execution: 0.029 s) >> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list >> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 >> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time >> 149668428 ms to writer queue >> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time >> 149668428 ms to file 'file:/Users/anbucheeralan/Ide >> aProjects/Spark2Example/ckpt/checkpoint-149668428' >> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time >> 149668428 ms saved to file >> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', >> took 4032 bytes and 9 ms* >> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time >> 149668428 ms >> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time >> 149668428 ms >> >> After the restart, >> >> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct >> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time >> 149668428 ms [(my_test,0,2000,2000)] >> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data >> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 >> batches): 149668428 ms, 149668431 ms, 149668434 ms, >> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, >> 149668449 ms, 149668452 ms, 149668455 ms* >> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 >> batches): * >> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): >> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, >> 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, >> 149668452 ms, 149668455 ms >> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 1496684280000 ms >> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job >> 149668428 ms.0 from job set of time 149668428 ms >> >> >> >> -- >> View this message in context: Fwd: Spark Streaming Checkpoint and >> Exactly Once Guarantee on Kafka Direct Stream >> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> >> Sent from the Apache Spark User List mailing list archive >> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >> > >
Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
This is the expected behavior. There are some confusing corner cases. If you are starting to play with Spark Streaming, i highly recommend learning Structured Streaming <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html> instead. On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com> wrote: > I am using Spark Streaming Checkpoint and Kafka Direct Stream. > It uses a 30 sec batch duration and normally the job is successful in > 15-20 sec. > > If the spark application fails after the successful completion > (149668428ms in the log below) and restarts, it's duplicating the last > batch again. > > Is this the expected behavior? I was expecting this to start a new batch > window. > > > Here are some logs: > > Last successful run: > 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time > 149668428 ms (execution: 0.029 s) > 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list > 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 > 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time > 149668428 ms to writer queue > 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time > 149668428 ms to file 'file:/Users/anbucheeralan/Ide > aProjects/Spark2Example/ckpt/checkpoint-149668428' > 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time > 149668428 ms saved to file > 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', > took 4032 bytes and 9 ms* > 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time > 149668428 ms > 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time > 149668428 ms > > After the restart, > > 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct > KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time > 149668428 ms [(my_test,0,2000,2000)] > 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data > *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 > batches): 149668428 ms, 149668431 ms, 149668434 ms, > 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, > 149668449 ms, 149668452 ms, 149668455 ms* > *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 > batches): * > *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): > *149668428 > ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, > 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, > 149668455 ms > 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms > 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job > 149668428 ms.0 from job set of time 149668428 ms > > > > ------ > View this message in context: Fwd: Spark Streaming Checkpoint and Exactly > Once Guarantee on Kafka Direct Stream > <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html> > Sent from the Apache Spark User List mailing list archive > <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com. >
Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again. Is this the expected behavior? I was expecting this to start a new batch window. Here are some logs: Last successful run: 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time 149668428 ms (execution: 0.029 s) 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time 149668428 ms to writer queue 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time 149668428 ms to file 'file:/Users/anbucheeralan/ IdeaProjects/Spark2Example/ckpt/checkpoint-149668428' 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428 ms saved to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', took 4032 bytes and 9 ms* 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time 149668428 ms After the restart, 17/06/05 13:42:31 INFO DirectKafkaInputDStream$ DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)] 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 batches): 149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms* *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 batches): * *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job 149668428 ms.0 from job set of time 149668428 ms -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream
I am using Spark Streaming Checkpoint and Kafka Direct Stream. It uses a 30 sec batch duration and normally the job is successful in 15-20 sec. If the spark application fails after the successful completion (149668428ms in the log below) and restarts, it's duplicating the last batch again. Is this the expected behavior? I was expecting this to start a new batch window. Here are some logs: Last successful run: 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time 149668428 ms (execution: 0.029 s) 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time 149668428 ms to writer queue 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time 149668428 ms to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428' 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428 ms saved to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428', took 4032 bytes and 9 ms* 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time 149668428 ms 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time 149668428 ms After the restart, 17/06/05 13:42:31 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)] 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 batches): 149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms* *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 batches): * *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms, 149668455 ms 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job 149668428 ms.0 from job set of time 149668428 ms
[Spark Streaming] Checkpoint backup (.bk) file purpose
Hello, Actually I'm studying metadata checkpoint implementation in Spark Streaming and I was wondering the purpose of so called "backup files": CheckpointWriter snippet: > // We will do checkpoint when generating a batch and completing a batch. > When the processing > // time of a batch is greater than the batch interval, checkpointing for > completing an old > // batch may run after checkpointing of a new batch. If this happens, > checkpoint of an old > // batch actually has the latest information, so we want to recovery from > it. Therefore, we > // also use the latest checkpoint time as the file name, so that we can > recover from the > // latest checkpoint file. > // > // Note: there is only one thread writing the checkpoint files, so we > don't need to worry > // about thread-safety. > val checkpointFile = Checkpoint.checkpointFile(checkpointDir, > latestCheckpointTime) > val backupFile = Checkpoint.checkpointBackupFile(checkpointDir, > latestCheckpointTime) > > // ... some lines further > // If the checkpoint file exists, back it up > // If the backup exists as well, just delete it, otherwise rename will fail > if (fs.exists(checkpointFile)) { > fs.delete(backupFile, true) // just in case it exists > if (!fs.rename(checkpointFile, backupFile)) { > logWarning(s"Could not rename $checkpointFile to $backupFile") > } > } > What is the role of this *backupFile* ? I understand that they are generated if checkpoint file for given timestamp already exists. But how it could be produced ? Is it a protection against checkpointing of different Spark applications to the same directory ? Or it's adapted to case described above (old batch terminated after new batch start) ? Best regards, Bartosz.
Can Spark Streaming checkpoint only metadata ?
Hi, I wonder if it is possible to checkpoint only metadata and not the data in RDD's and dataframes. Thanks, Natu
Question about Spark Streaming checkpoint interval
Need some clarification about the documentation. According to Spark doc "the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.” My question is that does the checkpointinterval apply only for data checkpointing or it applies to metadata checkpointing? The API says dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this DStream”, implying it is only for data checkpointing. My understanding is that metadata checkpointing is for driver failure. For example, in Kafka direct API, driver keeps track of the offset range of each partition. So if metadata checkpoint is NOT done for each batch, in driver failure, some messages in Kafka is going to be replayed. I do not find the answer in the document saying whether metadata checkpointing is done for each batch and whether checkpointinterval setting applies to both types of checkpointing. Maybe I miss it. If anyone can point me to the right documentation, I would highly appreciate it. Best Regards, Lan
Re: Question about Spark Streaming checkpoint interval
You are right. "checkpointInterval" is only for data checkpointing. "metadata checkpoint" is done for each batch. Feel free to send a PR to add the missing doc. Best Regards, Shixiong Zhu 2015-12-18 8:26 GMT-08:00 Lan Jiang: > Need some clarification about the documentation. According to Spark doc > > *"the default interval is a multiple of the batch interval that is at > least 10 seconds. It can be set by > using dstream.checkpoint(checkpointInterval). Typically, a checkpoint > interval of 5 - 10 sliding intervals of a DStream is a good setting to > try.”* > > My question is that does the *checkpointinterval* apply only for *data > checkpointing* or it applies to *metadata checkpointing*? The API says > dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this > DStream”, implying it is only for data checkpointing. My understanding is > that metadata checkpointing is for driver failure. For example, in Kafka > direct API, driver keeps track of the offset range of each partition. So if > metadata checkpoint is NOT done for each batch, in driver failure, some > messages in Kafka is going to be replayed. > > I do not find the answer in the document saying *whether metadata > checkpointing is done for each batch* and whether checkpointinterval > setting applies to both types of checkpointing. Maybe I miss it. If anyone > can point me to the right documentation, I would highly appreciate it. > > Best Regards, > > Lan >
Re: Spark Streaming Checkpoint help failed application
Hi, I'm no expert but Short answer: yes, after restarting your application will reread the failed messages Longer answer: it seems like you're mixing several things together Let me try and explain: - WAL is used to prevent your application from losing data by making the executor first write the data it receives from Kafka into WAL and only then updating the Kafka high level consumer (what the receivers approach is using) that it actually received the data (making it an at-least once) - Checkpoints are a mechanism that helps your *driver* recover from failures by saving driver information into HDFS (or S3 or whatever) Now, the reason I explained these is this: you asked "... one bug caused the streaming application to fail and exit" - so the failure you're trying to solve is in the driver. When you restart your application your driver will go and fetch the information it last saved in the checkpoint (saved into HDFS) and order to new executors (since the previous driver died so did the executors) to continue consuming data. Since your executors are using the receivers approach (as opposed to the directkafkastream) with WAL what will happen is that when they (the executors) get started they will first execute what was saved in the WAL and then read from the latest offsets saved in Kafka (Zookeeper) which in your case means you won't lose data (the executors first save the data to WAL then advance their offsets on Kafka) If you decide to go for the direct approach <https://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers> then your driver will be the one (and only one) managing the offsets for Kafka which means that some of the data the driver will save in the checkpoint will be the Kafka offsets I hope this helps :) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpoint-help-failed-application-tp25347p25357.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming checkpoint against s3
So as long as jar is kept on s3 and available across different runs, then the s3 checkpoint is working. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068p25081.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark streaming checkpoint against s3
Hi, I am trying to set spark streaming checkpoint to s3, here is what I did basically val checkpoint = "s3://myBucket/checkpoint" val ssc = StreamingContext.getOrCreate(checkpointDir, () => getStreamingContext(sparkJobName, batchDurationSec), classOf[MyClassKryoRegistrator], checkpointDir), getHadoopConfiguration) def getHadoopConfiguration: Configuration = { val hadoopConf = new Configuration() hadoopConf.set("fs.defaultFS", "s3://"+myBucket+"/") hadoopConf.set("fs.s3.awsAccessKeyId", "myAccessKey") hadoopConf.set("fs.s3.awsSecretAccessKey", "mySecretKey") hadoopConf.set("fs.s3n.awsAccessKeyId", "myAccessKey") hadoopConf.set("fs.s3n.awsSecretAccessKey", "mySecretKey hadoopConf } It is working as I can see that it tries to retrieve checkpoint from s3. However it did more than what I intended. I saw in the log of the following 15/10/14 19:58:47 ERROR spark.SparkContext: Jar not found at file:/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar Now SparkContext is trying to look the following path instead of local file:/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar How do I let SparkContext to look just /media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark streaming checkpoint against s3
It looks like that reconstruction of SparkContext from checkpoint data is trying to look for the jar file of previous failed runs. It can not find the jar files as our jar files are on local machines and were cleaned up after each failed run. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068p25070.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery throws Stack Overflow Error
Hi Swetha, The problem of stack overflow is that when recovering from checkpoint data, Java will use a recursive way to deserialize the call stack, if you have a large call stack, this recursive way can easily lead to stack overflow. This is caused by Java deserialization mechanism, you need to increase the stack size. Thanks Saisai On Fri, Sep 18, 2015 at 9:19 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Which version of Java are you using ? > > And release of Spark, please. > > Thanks > > On Fri, Sep 18, 2015 at 9:15 AM, swetha <swethakasire...@gmail.com> wrote: > >> Hi, >> >> When I try to recover my Spark Streaming job from a checkpoint directory, >> I >> get a StackOverFlow Error as shown below. Any idea as to why this is >> happening? >> >> 15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the >> context, marking it as stopped >> java.lang.StackOverflowError >> at java.util.Date.getTimeImpl(Date.java:887) >> at java.util.Date.getTime(Date.java:883) >> at java.util.Calendar.setTime(Calendar.java:1106) >> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955) >> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948) >> at java.text.DateFormat.format(DateFormat.java:298) >> at java.text.Format.format(Format.java:157) >> at >> org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136) >> at scala.Option.map(Option.scala:145) >> at >> org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136) >> at >> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) >> at >> >> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> at >> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at scala.Option.orElse(Option.scala:257) >> at >> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) >> at >> >> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) >> at >> >> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) >> at >> >> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) >> at scala.Option.orElse(Option.scala:257) >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Spark Streaming checkpoint recovery throws Stack Overflow Error
Hi, When I try to recover my Spark Streaming job from a checkpoint directory, I get a StackOverFlow Error as shown below. Any idea as to why this is happening? 15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the context, marking it as stopped java.lang.StackOverflowError at java.util.Date.getTimeImpl(Date.java:887) at java.util.Date.getTime(Date.java:883) at java.util.Calendar.setTime(Calendar.java:1106) at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955) at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948) at java.text.DateFormat.format(DateFormat.java:298) at java.text.Format.format(Format.java:157) at org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113) at org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137) at org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery throws Stack Overflow Error
Which version of Java are you using ? And release of Spark, please. Thanks On Fri, Sep 18, 2015 at 9:15 AM, swetha <swethakasire...@gmail.com> wrote: > Hi, > > When I try to recover my Spark Streaming job from a checkpoint directory, I > get a StackOverFlow Error as shown below. Any idea as to why this is > happening? > > 15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the > context, marking it as stopped > java.lang.StackOverflowError > at java.util.Date.getTimeImpl(Date.java:887) > at java.util.Date.getTime(Date.java:883) > at java.util.Calendar.setTime(Calendar.java:1106) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948) > at java.text.DateFormat.format(DateFormat.java:298) > at java.text.Format.format(Format.java:157) > at > org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136) > at > > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > > org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > > org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Retrieving offsets from previous spark streaming checkpoint
When deploying a spark streaming application I want to be able to retrieve the lastest kafka offsets that were processed by the pipeline, and create my kafka direct streams from those offsets. Because the checkpoint directory isn't guaranteed to be compatible between job deployments, I don't want to re-use the checkpoint directory from the previous job deployment. I also don't want to have to re-process everything in my kafka queues. Is there any way to retrieve this information from the checkpoint directory, or has anyone else solved this problem already? * I apologize if this is a duplicate message. I didn't see it go through earlier today, and I didn't see it in the archive.
Re: Retrieving offsets from previous spark streaming checkpoint
Access the offsets using HasOffsetRanges, save them in your datastore, provide them as the fromOffsets argument when starting the stream. See https://github.com/koeninger/kafka-exactly-once On Thu, Aug 13, 2015 at 3:53 PM, Stephen Durfey sjdur...@gmail.com wrote: When deploying a spark streaming application I want to be able to retrieve the lastest kafka offsets that were processed by the pipeline, and create my kafka direct streams from those offsets. Because the checkpoint directory isn't guaranteed to be compatible between job deployments, I don't want to re-use the checkpoint directory from the previous job deployment. I also don't want to have to re-process everything in my kafka queues. Is there any way to retrieve this information from the checkpoint directory, or has anyone else solved this problem already? * I apologize if this is a duplicate message. I didn't see it go through earlier today, and I didn't see it in the archive.
broadcast variable and accumulators issue while spark streaming checkpoint recovery
Hi I am using spark streaming 1.3 and using checkpointing. But job is failing to recover from checkpoint on restart. For broadcast variable it says : 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP): java.lang.ClassCastException: [B cannot be cast to pkg.broadcastvariableclassname at point where i call bcvariable.value() in map function. at mapfunction.. at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2.For accumulator variable it says : 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for ResultTask(1, 16) java.util.NoSuchElementException: key not found: 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893) its descibed in https://issues.apache.org/jira/browse/SPARK-5206 I can afford to reset the accumulator to 0 on stream restart . Is it possible to have it working ? Thanks
Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery
Rather than using accumulator directly, what you can do is something like this to lazily create an accumulator and use it (will get lazily recreated if driver restarts from checkpoint) dstream.transform { rdd = val accum = SingletonObject.getOrCreateAccumulator() // single object method to create an accumulator or get an already created one. rdd.map { x = /// use accum } } On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark streaming 1.3 and using checkpointing. But job is failing to recover from checkpoint on restart. For broadcast variable it says : 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP): java.lang.ClassCastException: [B cannot be cast to pkg.broadcastvariableclassname at point where i call bcvariable.value() in map function. at mapfunction.. at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2.For accumulator variable it says : 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for ResultTask(1, 16) java.util.NoSuchElementException: key not found: 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893) its descibed in https://issues.apache.org/jira/browse/SPARK-5206 I can afford to reset the accumulator to 0 on stream restart . Is it possible to have it working ? Thanks
Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery
1.How to do it in java? 2.Can broadcast objects also be created in same way after checkpointing. 3.Is it safe If I disable checkpoint and write offsets at end of each batch to hdfs in mycode and somehow specify in my job to use this offset for creating kafkastream at first time. How can I specify javasparkstreaming context to use this offsets while creating kafkastream at first time only and after that use from previous batch interval's offsets.. On Thu, Jul 30, 2015 at 2:49 AM, Tathagata Das t...@databricks.com wrote: Rather than using accumulator directly, what you can do is something like this to lazily create an accumulator and use it (will get lazily recreated if driver restarts from checkpoint) dstream.transform { rdd = val accum = SingletonObject.getOrCreateAccumulator() // single object method to create an accumulator or get an already created one. rdd.map { x = /// use accum } } On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark streaming 1.3 and using checkpointing. But job is failing to recover from checkpoint on restart. For broadcast variable it says : 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP): java.lang.ClassCastException: [B cannot be cast to pkg.broadcastvariableclassname at point where i call bcvariable.value() in map function. at mapfunction.. at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2.For accumulator variable it says : 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for ResultTask(1, 16) java.util.NoSuchElementException: key not found: 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893) its descibed in https://issues.apache.org/jira/browse/SPARK-5206 I can afford to reset the accumulator to 0 on stream restart . Is it possible to have it working ? Thanks
Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery
1. Same way, using static fields in a class. 2. Yes, same way. 3. Yes, you can do that. To differentiate from first time v/s continue, you have to build your own semantics. For example, if the location in HDFS you are suppose to store the offsets does not have any data, that means its probably running the first time. Otherwise it is continuing, and so read offset and continue from there. On Wed, Jul 29, 2015 at 9:38 PM, Shushant Arora shushantaror...@gmail.com wrote: 1.How to do it in java? 2.Can broadcast objects also be created in same way after checkpointing. 3.Is it safe If I disable checkpoint and write offsets at end of each batch to hdfs in mycode and somehow specify in my job to use this offset for creating kafkastream at first time. How can I specify javasparkstreaming context to use this offsets while creating kafkastream at first time only and after that use from previous batch interval's offsets.. On Thu, Jul 30, 2015 at 2:49 AM, Tathagata Das t...@databricks.com wrote: Rather than using accumulator directly, what you can do is something like this to lazily create an accumulator and use it (will get lazily recreated if driver restarts from checkpoint) dstream.transform { rdd = val accum = SingletonObject.getOrCreateAccumulator() // single object method to create an accumulator or get an already created one. rdd.map { x = /// use accum } } On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi I am using spark streaming 1.3 and using checkpointing. But job is failing to recover from checkpoint on restart. For broadcast variable it says : 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP): java.lang.ClassCastException: [B cannot be cast to pkg.broadcastvariableclassname at point where i call bcvariable.value() in map function. at mapfunction.. at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2.For accumulator variable it says : 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for ResultTask(1, 16) java.util.NoSuchElementException: key not found: 2 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:58) at scala.collection.mutable.HashMap.apply(HashMap.scala:64) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894) at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893) its descibed in https://issues.apache.org/jira/browse/SPARK-5206 I can afford to reset the accumulator to 0 on stream restart . Is it possible to have it working ? Thanks
Re: spark streaming - checkpoint
on using yarn-cluster, it works good On Mon, Jun 29, 2015 at 12:07 PM, ram kumar ramkumarro...@gmail.com wrote: SPARK_CLASSPATH=$CLASSPATH:/usr/hdp/2.2.0.0-2041/hadoop-mapreduce/* in spark-env.sh I think i am facing the same issue https://issues.apache.org/jira/browse/SPARK-6203 On Mon, Jun 29, 2015 at 11:38 AM, ram kumar ramkumarro...@gmail.com wrote: I am using Spark 1.2.0.2.2.0.0-82 (git revision de12451) built for Hadoop 2.6.0.2.2.0.0-2041 1) SPARK_CLASSPATH not set 2) spark.executor.extraClassPath not set should I upgrade my version to 1.3 and check On Sat, Jun 27, 2015 at 1:07 PM, Tathagata Das t...@databricks.com wrote: Do you have SPARK_CLASSPATH set in both cases? Before and after checkpoint? If yes, then you should not be using SPARK_CLASSPATH, it has been deprecated since Spark 1.0 because of its ambiguity. Also where do you have spark.executor.extraClassPath set? I dont see it in the spark-submit command. On Fri, Jun 26, 2015 at 6:05 AM, ram kumar ramkumarro...@gmail.com wrote: Hi, - JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1)); ssc.checkpoint(checkPointDir); JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { return createContext(checkPointDir, outputDirectory); } }; JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, factory); *first time, i run this. It work fine.* *but, second time. it shows following error.* *i deleted the checkpoint path and then it works.* --- [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath 2015-06-26 12:43:42,981 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-06-26 12:43:44,246 WARN [main] shortcircuit.DomainSocketFactory (DomainSocketFactory.java:init(116)) - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath Exception in thread main org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334) at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320) at org.apache.spark.SparkContext.init(SparkContext.scala:178) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566) at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) at com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) [user@h7 ~] -- *can anyone help me with it* *thanks*
Re: spark streaming - checkpoint
SPARK_CLASSPATH=$CLASSPATH:/usr/hdp/2.2.0.0-2041/hadoop-mapreduce/* in spark-env.sh I think i am facing the same issue https://issues.apache.org/jira/browse/SPARK-6203 On Mon, Jun 29, 2015 at 11:38 AM, ram kumar ramkumarro...@gmail.com wrote: I am using Spark 1.2.0.2.2.0.0-82 (git revision de12451) built for Hadoop 2.6.0.2.2.0.0-2041 1) SPARK_CLASSPATH not set 2) spark.executor.extraClassPath not set should I upgrade my version to 1.3 and check On Sat, Jun 27, 2015 at 1:07 PM, Tathagata Das t...@databricks.com wrote: Do you have SPARK_CLASSPATH set in both cases? Before and after checkpoint? If yes, then you should not be using SPARK_CLASSPATH, it has been deprecated since Spark 1.0 because of its ambiguity. Also where do you have spark.executor.extraClassPath set? I dont see it in the spark-submit command. On Fri, Jun 26, 2015 at 6:05 AM, ram kumar ramkumarro...@gmail.com wrote: Hi, - JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1)); ssc.checkpoint(checkPointDir); JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { return createContext(checkPointDir, outputDirectory); } }; JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, factory); *first time, i run this. It work fine.* *but, second time. it shows following error.* *i deleted the checkpoint path and then it works.* --- [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath 2015-06-26 12:43:42,981 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-06-26 12:43:44,246 WARN [main] shortcircuit.DomainSocketFactory (DomainSocketFactory.java:init(116)) - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath Exception in thread main org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334) at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320) at org.apache.spark.SparkContext.init(SparkContext.scala:178) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566) at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) at com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) [user@h7 ~] -- *can anyone help me with it* *thanks*
Re: spark streaming - checkpoint
Do you have SPARK_CLASSPATH set in both cases? Before and after checkpoint? If yes, then you should not be using SPARK_CLASSPATH, it has been deprecated since Spark 1.0 because of its ambiguity. Also where do you have spark.executor.extraClassPath set? I dont see it in the spark-submit command. On Fri, Jun 26, 2015 at 6:05 AM, ram kumar ramkumarro...@gmail.com wrote: Hi, - JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1)); ssc.checkpoint(checkPointDir); JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { return createContext(checkPointDir, outputDirectory); } }; JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, factory); *first time, i run this. It work fine.* *but, second time. it shows following error.* *i deleted the checkpoint path and then it works.* --- [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath 2015-06-26 12:43:42,981 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-06-26 12:43:44,246 WARN [main] shortcircuit.DomainSocketFactory (DomainSocketFactory.java:init(116)) - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath Exception in thread main org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334) at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320) at org.apache.spark.SparkContext.init(SparkContext.scala:178) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566) at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) at com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) [user@h7 ~] -- *can anyone help me with it* *thanks*
spark streaming - checkpoint
Hi, - JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1)); ssc.checkpoint(checkPointDir); JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { public JavaStreamingContext create() { return createContext(checkPointDir, outputDirectory); } }; JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkPointDir, factory); *first time, i run this. It work fine.* *but, second time. it shows following error.* *i deleted the checkpoint path and then it works.* --- [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar Spark assembly has been built with Hive, including Datanucleus jars on classpath 2015-06-26 12:43:42,981 WARN [main] util.NativeCodeLoader (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2015-06-26 12:43:44,246 WARN [main] shortcircuit.DomainSocketFactory (DomainSocketFactory.java:init(116)) - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. This is deprecated in Spark 1.0+. Please instead use: - ./spark-submit with --driver-class-path to augment the driver classpath - spark.executor.extraClassPath to augment the executor classpath Exception in thread main org.apache.spark.SparkException: Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334) at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332) at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320) at scala.Option.foreach(Option.scala:236) at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320) at org.apache.spark.SparkContext.init(SparkContext.scala:178) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561) at org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566) at org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) at com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) [user@h7 ~] -- *can anyone help me with it* *thanks*
Re: Spark Streaming checkpoint recovery causes IO re-execution
Hi Hannes, Good to know I'm not alone on the boat. Sorry about not posting back, I haven't gone in a while onto the user list. It's on my agenda to get over this issue. Will be very important for our recovery implementation. I have done an internal proof of concept but without any conclusions so far. The main approach is to have full control over offsets, meaning upon each processed batch we will need to persist the last processed event (I'm using Kafka btw) and keep the offset somewhere, so that upon recovery we only start the streaming from the last processed one. This kind of goes in conflict with the new ReliableReceiver implementation, where that control is taken away from the processing layer... When recovering Spark Streaming, we need to control the recovered batches so that only internal state gets updated and no IO gets executed. For this we need to make internal changes to Spark Streaming I exposed a function that identifies how many batches are being recovered. Then I passed that info upfront to the workers, and with a counter they are aware of how many batches were recomputed, thus avoiding IO re-execution. This is very much in embryo stage so I can't actually help you much at this stage... This is the function I've created inside JobGenerator class to access the recovered batches: def getDownTimes() : Seq[Time] = { println(123) if (ssc.isCheckpointPresent) { val batchDuration = ssc.graph.batchDuration // Batches when the master was down, that is, // between the checkpoint and current restart time val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) logInfo(Batches during down time ( + downTimes.size + batches): + downTimes.mkString(, )) downTimes } else Seq[Time]() } Has been a while since I last visited this issue so I'm probably not able to give you too many details right now, but I expect to have a concrete solution on which ultimately I could push as proposal to the Spark dev team. I will definitely notify people on this thread at least. Tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p21265.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
Hi Yana, You are correct. What needs to be added is that besides RDDs being checkpointed, metadata which represents execution of computations are also checkpointed in Spark Streaming. Upon driver recovery, the last batches (the ones already executed and the ones that should have been executed while shut down) are recomputed. This is very good if we just want to recover state and if we don't have any other component or data store depending on Spark's output. In the case we do have that requirement (which is my case) all the nodes will re-execute all that IO provoking overall system inconsistency as the outside system were not expecting events from the past. We need some way of making Spark aware of which computations are recomputations and which are not so we can empower Spark developers to introduce specific logic if they need to. Let me know if any of this doesn't make sense. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13205.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
I understand that the DB writes are happening from the workers unless you collect. My confusion is that you believe workers recompute on recovery(nodes computations which get redone upon recovery). My understanding is that checkpointing dumps the RDD to disk and the cuts the RDD lineage. So I thought on driver restart you'll get a set of new executor processes but they would read the last known state of the RDD from HDFS checkpoint. Am I off here? So the only situation I can imagine where you end up recomputing is if your checkpointing at a larger interval than your batch size (i.e. the RDD on disk does not reflect it's last precrash state)? On Thu, Aug 28, 2014 at 1:32 PM, RodrigoB rodrigo.boav...@aspect.com wrote: Hi Yana, The fact is that the DB writing is happening on the node level and not on Spark level. One of the benefits of distributed computing nature of Spark is enabling IO distribution as well. For example, is much faster to have the nodes to write to Cassandra instead of having them all collected at the driver level and sending the writes from there. The problem is that nodes computations which get redone upon recovery. If these lambda functions send events to other systems these events would get resent upon re-computation causing overall system instability. Hope this helps you understand the problematic. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
Not sure if this make sense, but maybe would be nice to have a kind of flag available within the code that tells me if I'm running in a normal situation or during a recovery. To better explain this, let's consider the following scenario: I am processing data, let's say from a Kafka streaming, and I am updating a database based on the computations. During the recovery I don't want to update again the database (for many reasons, let's just assume that) but I want my system to be in the same status as before, thus I would like to know if my code is running for the first time or during a recovery so I can avoid to update the database again. More generally I want to know this in case I'm interacting with external entities. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13009.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
Can you clarify the scenario: val ssc = new StreamingContext(sparkConf, Seconds(10)) ssc.checkpoint(checkpointDirectory) val stream = KafkaUtils.createStream(...) val wordCounts = lines.flatMap(_.split( )).map(x = (x, 1L)) val wordDstream= wordCounts.updateStateByKey[Int](updateFunc) wordDstream.foreachRDD( rdd=rdd.foreachPartition( //sync state to external store//) ) My impression is that during recovery from checkpoint, your wordDstream would be in the state that it was before the crash +1 batch interval forward when you get to the foreachRDD part -- even if re-creating the pre-crash RDD is really slow. So if your driver goes down at 10:20 and you restart at 10:30, I thought at the time of the DB write wordDstream would have exactly the state of 10:20 + 10seconds worth of aggregated stream data? I don't really understand what you mean by Upon metadata checkpoint recovery (before the data checkpoint occurs) but it sounds like you're observing the same DB write happening twice? I don't have any advice for you but I am interested in understanding better what happens in the recovery scenario so just trying to clarify what you observe. On Thu, Aug 28, 2014 at 6:42 AM, GADV giulio_devec...@yahoo.com wrote: Not sure if this make sense, but maybe would be nice to have a kind of flag available within the code that tells me if I'm running in a normal situation or during a recovery. To better explain this, let's consider the following scenario: I am processing data, let's say from a Kafka streaming, and I am updating a database based on the computations. During the recovery I don't want to update again the database (for many reasons, let's just assume that) but I want my system to be in the same status as before, thus I would like to know if my code is running for the first time or during a recovery so I can avoid to update the database again. More generally I want to know this in case I'm interacting with external entities. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13009.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming checkpoint recovery causes IO re-execution
Hi Yana, The fact is that the DB writing is happening on the node level and not on Spark level. One of the benefits of distributed computing nature of Spark is enabling IO distribution as well. For example, is much faster to have the nodes to write to Cassandra instead of having them all collected at the driver level and sending the writes from there. The problem is that nodes computations which get redone upon recovery. If these lambda functions send events to other systems these events would get resent upon re-computation causing overall system instability. Hope this helps you understand the problematic. tnks, Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - What does Spark Streaming checkpoint?
The StreamingContext can be recreated from a checkpoint file, indeed. check out the following Spark Streaming source files for details: StreamingContext, Checkpoint, DStream, DStreamCheckpoint, and DStreamGraph. On Wed, Jul 9, 2014 at 6:11 PM, Yan Fang yanfang...@gmail.com wrote: Hi guys, I am a little confusing by the checkpointing in Spark Streaming. It checkpoints the intermediate data for the stateful operations for sure. Does it also checkpoint the information of StreamingContext? Because it seems we can recreate the SC from the checkpoint in a driver node failure scenario. When I looked at the checkpoint directory, did not find much clue. Any help? Thank you very much. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108
Spark Streaming - What does Spark Streaming checkpoint?
Hi guys, I am a little confusing by the checkpointing in Spark Streaming. It checkpoints the intermediate data for the stateful operations for sure. Does it also checkpoint the information of StreamingContext? Because it seems we can recreate the SC from the checkpoint in a driver node failure scenario. When I looked at the checkpoint directory, did not find much clue. Any help? Thank you very much. Best, Fang, Yan yanfang...@gmail.com +1 (206) 849-4108