Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Jungtaek Lim
Replied inline:

On Sun, May 3, 2020 at 6:25 PM Magnus Nilsson  wrote:

> Thank you, so that would mean spark gets the current latest offset(s) when
> the trigger fires and then process all available messages in the topic upto
> and including that offset as long as maxOffsetsPerTrigger is the default of
> None (or large enought to handle all available messages).
>

Yes it starts from the offset of latest batch. `maxOffsetsPerTrigger` will
be ignored starting from Spark 3.0.0, which means for Spark 2.x it's still
affecting even Trigger.Once is used I guess.


>
> I think the word micro-batch confused me (more like mega-batch in some
> cases). It makes sense though, this makes Trigger.Once a fixed interval
> trigger that's only fired once and not repeatedly.
>

"micro" is relative - though Spark by default processes all available
inputs per batch, in most cases you'll want to make the batch size
(interval) as small as possible, as it defines the latency of the output.
Trigger.Once is an unusual case in streaming workload - that's more alike
continuous execution of "batch". I refer "continuous" as picking up latest
context which is the characteristic of streaming query, hence hybrid one.


>
>
> On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
> wrote:
>
>> If I understand correctly, Trigger.once executes only one micro-batch and
>> terminates, that's all. Your understanding of structured streaming applies
>> there as well.
>>
>> It's like a hybrid approach as bringing incremental processing from
>> micro-batch but having processing interval as batch. That said, while it
>> enables to get both sides of benefits, it's basically structured streaming,
>> inheriting all the limitations on the structured streaming, compared to the
>> batch query.
>>
>> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
>> Trigger.once will "ignore" the read limit per micro-batch on data source
>> (like maxOffsetsPerTrigger) and process all available input as possible.
>> (Data sources should migrate to the new API to take effect, but works for
>> built-in data sources like file and Kafka.)
>>
>> 1. https://issues.apache.org/jira/browse/SPARK-30669
>>
>> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>>
>>> I've always had a question about Trigger.Once that I never got around to
>>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>>
>>> Will Trigger.Once get the last offset(s) when it starts and then quit
>>> once it hits this offset(s) or will the job run until no new messages is
>>> added to the topic for a particular amount of time?
>>>
>>> br,
>>>
>>> Magnus
>>>
>>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>>
 Hi Rishi,

 That is exactly why Trigger.Once was created for Structured Streaming.
 The way we look at streaming is that it doesn't have to be always real
 time, or 24-7 always on. We see streaming as a workflow that you have to
 repeat indefinitely. See this blog post for more details!

 https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

 Best,
 Burak

 On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
 wrote:

> Hi All,
>
> I recently started playing with spark streaming, and checkpoint
> location feature looks very promising. I wonder if anyone has an opinion
> about using spark streaming with checkpoint location option as a slow 
> batch
> processing solution. What would be the pros and cons of utilizing 
> streaming
> with checkpoint location feature to achieve fault tolerance in batch
> processing application?
>
> --
> Regards,
>
> Rishi Shah
>



Re: [spark streaming] checkpoint location feature for batch processing

2020-05-03 Thread Magnus Nilsson
Thank you, so that would mean spark gets the current latest offset(s) when
the trigger fires and then process all available messages in the topic upto
and including that offset as long as maxOffsetsPerTrigger is the default of
None (or large enought to handle all available messages).

I think the word micro-batch confused me (more like mega-batch in some
cases). It makes sense though, this makes Trigger.Once a fixed interval
trigger that's only fired once and not repeatedly.


On Sun, May 3, 2020 at 3:20 AM Jungtaek Lim 
wrote:

> If I understand correctly, Trigger.once executes only one micro-batch and
> terminates, that's all. Your understanding of structured streaming applies
> there as well.
>
> It's like a hybrid approach as bringing incremental processing from
> micro-batch but having processing interval as batch. That said, while it
> enables to get both sides of benefits, it's basically structured streaming,
> inheriting all the limitations on the structured streaming, compared to the
> batch query.
>
> Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
> Trigger.once will "ignore" the read limit per micro-batch on data source
> (like maxOffsetsPerTrigger) and process all available input as possible.
> (Data sources should migrate to the new API to take effect, but works for
> built-in data sources like file and Kafka.)
>
> 1. https://issues.apache.org/jira/browse/SPARK-30669
>
> 2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:
>
>> I've always had a question about Trigger.Once that I never got around to
>> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>>
>> Will Trigger.Once get the last offset(s) when it starts and then quit
>> once it hits this offset(s) or will the job run until no new messages is
>> added to the topic for a particular amount of time?
>>
>> br,
>>
>> Magnus
>>
>> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>>
>>> Hi Rishi,
>>>
>>> That is exactly why Trigger.Once was created for Structured Streaming.
>>> The way we look at streaming is that it doesn't have to be always real
>>> time, or 24-7 always on. We see streaming as a workflow that you have to
>>> repeat indefinitely. See this blog post for more details!
>>>
>>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>>
>>> Best,
>>> Burak
>>>
>>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>>> wrote:
>>>
 Hi All,

 I recently started playing with spark streaming, and checkpoint
 location feature looks very promising. I wonder if anyone has an opinion
 about using spark streaming with checkpoint location option as a slow batch
 processing solution. What would be the pros and cons of utilizing streaming
 with checkpoint location feature to achieve fault tolerance in batch
 processing application?

 --
 Regards,

 Rishi Shah

>>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Jungtaek Lim
If I understand correctly, Trigger.once executes only one micro-batch and
terminates, that's all. Your understanding of structured streaming applies
there as well.

It's like a hybrid approach as bringing incremental processing from
micro-batch but having processing interval as batch. That said, while it
enables to get both sides of benefits, it's basically structured streaming,
inheriting all the limitations on the structured streaming, compared to the
batch query.

Spark 3.0.0 will bring some change on Trigger.once (SPARK-30669 [1]) -
Trigger.once will "ignore" the read limit per micro-batch on data source
(like maxOffsetsPerTrigger) and process all available input as possible.
(Data sources should migrate to the new API to take effect, but works for
built-in data sources like file and Kafka.)

1. https://issues.apache.org/jira/browse/SPARK-30669

2020년 5월 2일 (토) 오후 5:35, Magnus Nilsson 님이 작성:

> I've always had a question about Trigger.Once that I never got around to
> ask or test for myself. If you have a 24/7 stream to a Kafka topic.
>
> Will Trigger.Once get the last offset(s) when it starts and then quit once
> it hits this offset(s) or will the job run until no new messages is added
> to the topic for a particular amount of time?
>
> br,
>
> Magnus
>
> On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:
>
>> Hi Rishi,
>>
>> That is exactly why Trigger.Once was created for Structured Streaming.
>> The way we look at streaming is that it doesn't have to be always real
>> time, or 24-7 always on. We see streaming as a workflow that you have to
>> repeat indefinitely. See this blog post for more details!
>>
>> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>>
>> Best,
>> Burak
>>
>> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
>> wrote:
>>
>>> Hi All,
>>>
>>> I recently started playing with spark streaming, and checkpoint location
>>> feature looks very promising. I wonder if anyone has an opinion about using
>>> spark streaming with checkpoint location option as a slow batch processing
>>> solution. What would be the pros and cons of utilizing streaming with
>>> checkpoint location feature to achieve fault tolerance in batch processing
>>> application?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-02 Thread Magnus Nilsson
I've always had a question about Trigger.Once that I never got around to
ask or test for myself. If you have a 24/7 stream to a Kafka topic.

Will Trigger.Once get the last offset(s) when it starts and then quit once
it hits this offset(s) or will the job run until no new messages is added
to the topic for a particular amount of time?

br,

Magnus

On Sat, May 2, 2020 at 1:22 AM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Thanks Burak! Appreciate it. This makes sense.

How do you suggest we make sure resulting data doesn't produce tiny files?
If we are not on databricks yet and can not leverage delta lake features?
Also checkpointing feature, do you have active blog/article I can take
a look at to try out an example?

On Fri, May 1, 2020 at 7:22 PM Burak Yavuz  wrote:

> Hi Rishi,
>
> That is exactly why Trigger.Once was created for Structured Streaming. The
> way we look at streaming is that it doesn't have to be always real time, or
> 24-7 always on. We see streaming as a workflow that you have to repeat
> indefinitely. See this blog post for more details!
>
> https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
>
> Best,
> Burak
>
> On Fri, May 1, 2020 at 2:55 PM Rishi Shah 
> wrote:
>
>> Hi All,
>>
>> I recently started playing with spark streaming, and checkpoint location
>> feature looks very promising. I wonder if anyone has an opinion about using
>> spark streaming with checkpoint location option as a slow batch processing
>> solution. What would be the pros and cons of utilizing streaming with
>> checkpoint location feature to achieve fault tolerance in batch processing
>> application?
>>
>> --
>> Regards,
>>
>> Rishi Shah
>>
>

-- 
Regards,

Rishi Shah


Re: [spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Burak Yavuz
Hi Rishi,

That is exactly why Trigger.Once was created for Structured Streaming. The
way we look at streaming is that it doesn't have to be always real time, or
24-7 always on. We see streaming as a workflow that you have to repeat
indefinitely. See this blog post for more details!
https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html

Best,
Burak

On Fri, May 1, 2020 at 2:55 PM Rishi Shah  wrote:

> Hi All,
>
> I recently started playing with spark streaming, and checkpoint location
> feature looks very promising. I wonder if anyone has an opinion about using
> spark streaming with checkpoint location option as a slow batch processing
> solution. What would be the pros and cons of utilizing streaming with
> checkpoint location feature to achieve fault tolerance in batch processing
> application?
>
> --
> Regards,
>
> Rishi Shah
>


[spark streaming] checkpoint location feature for batch processing

2020-05-01 Thread Rishi Shah
Hi All,

I recently started playing with spark streaming, and checkpoint location
feature looks very promising. I wonder if anyone has an opinion about using
spark streaming with checkpoint location option as a slow batch processing
solution. What would be the pros and cons of utilizing streaming with
checkpoint location feature to achieve fault tolerance in batch processing
application?

-- 
Regards,

Rishi Shah


Spark Streaming: Checkpoint, Recovery and Idempotency

2019-05-29 Thread sheelstera
Hello,

I am trying to understand the content of a checkpoint and corresponding
recovery.

*My understanding of Spark Checkpointing:
*
If you have really long DAGs and your spark cluster fails, checkpointing
helps by persisting intermediate state e.g. to HDFS. So, a DAG of 50
transformations can be reduced to 4-5 transformations with the help of
checkpointing. It breaks the DAG though.

*Checkpointing in Streaming
*
My Spark Streaming job has a microbatch of 5 seconds. As I understand, a new
job is submitted every 5 secs on the Eventloop that invokes the JobGenerator
to generate the RDD DAG for the new microbatch from the DStreamGraph, while
the receiver in the meantime keeps collecting the data for the next new
microbatch for the next cycle. If I enable checkpointing, as I understand,
it will periodically keep checkpointing the "current state".

*Question:
*
What is this "state"? Is this the combination of the base RDD and the state
of the operators/transformations of the DAG for the present microbatch only?
So I have the following:

/ubatch 0 at T=0 > SUCCESS
ubatch 1 at T=5 > SUCCESS
ubatch 2 at T=10 ---> SUCCESS
> Checkpointing kicks in now at T=12
ubatch 3 at T=15 ---> SUCCESS
ubatch 4 at T=20
> Spark Cluster DOWN at T=23 => ubatch 4 FAILS!!!
...
> Spark Cluster is restarted at *T=100*/

What specifically goes and sits on the disk as a result of checkpointing at
T=12? Will it just store the present state of operators of the DAG for
ubatch 2?

a. If yes, then during recovery at T=100, the last checkpoint available is
at T=12. What happens to the ubatch 3 at T=15 which was already processed
successfully. Does the application reprocess ubatch 3 and handle idempotency
here? If yes, do we go to the streaming source e.g. Kafka and rewind the
offset to be able to replay the contents starting from the ubatch 3?

b. If no, then what exactly goes into the checkpoint directory at T=12?

https://stackoverflow.com/questions/56362347/spark-checkpointing-content-recovery-and-idempotency
  

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming checkpoint

2018-01-29 Thread KhajaAsmath Mohammed
Hi,

I have written spark streaming job to use the checkpoint. I have stopped
the streaming job for 5 days and then restart it today.

I have encountered weird issue where it shows as zero records for all
cycles till date. is it causing data loss?

[image: Inline image 1]


Thanks,
Asmath


Spark 2.1.2 Spark Streaming checkpoint interval not respected

2017-11-18 Thread Shing Hing Man
Hi, 
In the following example using mapWithState, I set checkpoint interval to 1 
minute. From the log, Spark stills write to the checkpoint directory every 
second. Would be appreciated if someone can point out what I have done wrong. 
object MapWithStateDemo {
  def main(args: Array[String]) {
if (args.length < 2) {
  System.err.println("Usage: MapWithStateDemo  ")
  System.exit(1)
}


val sparkConf = new SparkConf().setAppName("MapWithStateDemo")
  .setIfMissing("spark.master","local[*]")

// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))


// Initial state RDD for mapWithState operation
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 
1)))

// Create a ReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

// Update the cumulative count using mapWithState
// This will give a DStream made of state (which is the cumulative count of 
the words)
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  val output = (word, sum)
  state.update(sum)
  output
}

val stateDstream: MapWithStateDStream[String, Int, Int, (String, Int)] =
  
wordDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(10)).initialState(initialRDD))


stateDstream.checkpoint(Minutes(1L))
stateDstream.print()



val targetDir = new 
File(getClass.getResource("/").toURI).getParentFile.getParentFile
val checkpointDir = targetDir + "/checkpoint"
ssc.checkpoint(checkpointDir)
ssc.start()
ssc.awaitTermination()
  }
}
Thanks in advance for any assistance !
Shing


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
In either case, end to end exactly once guarantee can only be ensured only
if the output sink is updated transactionally. The engine has to re execute
data on failure. Exactly once guarantee means that the external storage is
updated as if each data record was computed exactly once. That's why you
need to update them transactionally to handle possible recomputations.

This is true for both spark streaming and structured streaming. Hope this
helps.

On Jun 6, 2017 5:56 AM, "ALunar Beach" <alunarbe...@gmail.com> wrote:

> Thanks TD.
> In pre-structured streaming, exactly once guarantee on input is not
> guaranteed. is it?
>
> On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com
> > wrote:
>
>> This is the expected behavior. There are some confusing corner cases.
>> If you are starting to play with Spark Streaming, i highly recommend
>> learning Structured Streaming
>> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
>> instead.
>>
>> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
>> wrote:
>>
>>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>>> It uses a 30 sec batch duration and normally the job is successful in
>>> 15-20 sec.
>>>
>>> If the spark application fails after the successful completion
>>> (149668428ms in the log below) and restarts, it's duplicating the last
>>> batch again.
>>>
>>> Is this the expected behavior? I was expecting this to start a new batch
>>> window.
>>>
>>>
>>> Here are some logs:
>>>
>>> Last successful run:
>>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>>> 149668428 ms (execution: 0.029 s)
>>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>>> 149668428 ms to writer queue
>>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>>> 149668428 ms saved to file
>>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>>> took 4032 bytes and 9 ms*
>>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>>> 149668428 ms
>>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>>> 149668428 ms
>>>
>>> After the restart,
>>>
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>>> 149668428 ms [(my_test,0,2000,2000)]
>>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms*
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>>> batches): *
>>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
>>> batches): *149668428 ms, 149668431 ms, 149668434 ms,
>>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>>> 149668449 ms, 149668452 ms, 149668455 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
>>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>>> 149668428 ms.0 from job set of time 149668428 ms
>>>
>>>
>>>
>>> --
>>> View this message in context: Fwd: Spark Streaming Checkpoint and
>>> Exactly Once Guarantee on Kafka Direct Stream
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>>> Sent from the Apache Spark User List mailing list archive
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>
>>
>>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread ALunar Beach
Thanks TD.
In pre-structured streaming, exactly once guarantee on input is not
guaranteed. is it?

On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> This is the expected behavior. There are some confusing corner cases.
> If you are starting to play with Spark Streaming, i highly recommend
> learning Structured Streaming
> <http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
> instead.
>
> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
> wrote:
>
>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>> It uses a 30 sec batch duration and normally the job is successful in
>> 15-20 sec.
>>
>> If the spark application fails after the successful completion
>> (149668428ms in the log below) and restarts, it's duplicating the last
>> batch again.
>>
>> Is this the expected behavior? I was expecting this to start a new batch
>> window.
>>
>>
>> Here are some logs:
>>
>> Last successful run:
>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>> 149668428 ms (execution: 0.029 s)
>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>> 149668428 ms to writer queue
>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>> 149668428 ms saved to file
>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>> took 4032 bytes and 9 ms*
>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>> 149668428 ms
>>
>> After the restart,
>>
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>> 149668428 ms [(my_test,0,2000,2000)]
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>> 149668449 ms, 149668452 ms, 149668455 ms*
>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>> batches): *
>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches):
>> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms,
>> 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms,
>> 149668452 ms, 149668455 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 1496684280000 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>> 149668428 ms.0 from job set of time 149668428 ms
>>
>>
>>
>> --
>> View this message in context: Fwd: Spark Streaming Checkpoint and
>> Exactly Once Guarantee on Kafka Direct Stream
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
>> Sent from the Apache Spark User List mailing list archive
>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>
>
>


Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread Tathagata Das
This is the expected behavior. There are some confusing corner cases.
If you are starting to play with Spark Streaming, i highly recommend
learning Structured Streaming
<http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html>
instead.

On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan <alunarbe...@gmail.com>
wrote:

> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
> It uses a 30 sec batch duration and normally the job is successful in
> 15-20 sec.
>
> If the spark application fails after the successful completion
> (149668428ms in the log below) and restarts, it's duplicating the last
> batch again.
>
> Is this the expected behavior? I was expecting this to start a new batch
> window.
>
>
> Here are some logs:
>
> Last successful run:
> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
> 149668428 ms (execution: 0.029 s)
> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
> 149668428 ms to writer queue
> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
> aProjects/Spark2Example/ckpt/checkpoint-149668428'
> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
> 149668428 ms saved to file
> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
> took 4032 bytes and 9 ms*
> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
> 149668428 ms
> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
> 149668428 ms
>
> After the restart,
>
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
> 149668428 ms [(my_test,0,2000,2000)]
> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
> batches): 149668428 ms, 149668431 ms, 149668434 ms,
> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
> 149668449 ms, 149668452 ms, 149668455 ms*
> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
> batches): *
> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): 
> *149668428
> ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
> 149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
> 149668455 ms
> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
> 149668428 ms.0 from job set of time 149668428 ms
>
>
>
> ------
> View this message in context: Fwd: Spark Streaming Checkpoint and Exactly
> Once Guarantee on Kafka Direct Stream
> <http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>


Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread anbucheeralan
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file 'file:/Users/anbucheeralan/
IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO DirectKafkaInputDStream$
DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Spark-Streaming-Checkpoint-and-Exactly-Once-Guarantee-on-Kafka-Direct-Stream-tp28743.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread ALunar Beach
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms


[Spark Streaming] Checkpoint backup (.bk) file purpose

2017-03-16 Thread Bartosz Konieczny
Hello,

Actually I'm studying metadata checkpoint implementation in Spark Streaming
and I was wondering the purpose of so called "backup files":

CheckpointWriter snippet:

> // We will do checkpoint when generating a batch and completing a batch.
> When the processing
> // time of a batch is greater than the batch interval, checkpointing for
> completing an old
> // batch may run after checkpointing of a new batch. If this happens,
> checkpoint of an old
> // batch actually has the latest information, so we want to recovery from
> it. Therefore, we
> // also use the latest checkpoint time as the file name, so that we can
> recover from the
> // latest checkpoint file.
> //
> // Note: there is only one thread writing the checkpoint files, so we
> don't need to worry
> // about thread-safety.
> val checkpointFile = Checkpoint.checkpointFile(checkpointDir,
> latestCheckpointTime)
> val backupFile = Checkpoint.checkpointBackupFile(checkpointDir,
> latestCheckpointTime)
>
> // ... some lines further
> // If the checkpoint file exists, back it up
> // If the backup exists as well, just delete it, otherwise rename will fail
> if (fs.exists(checkpointFile)) {
>   fs.delete(backupFile, true) // just in case it exists
>   if (!fs.rename(checkpointFile, backupFile)) {
> logWarning(s"Could not rename $checkpointFile to $backupFile")
>   }
> }
>

What is the role of this *backupFile* ? I understand that they are
generated if checkpoint file for given timestamp already exists. But how it
could be produced ? Is it a protection against checkpointing of different
Spark applications to the same directory ? Or it's adapted to case
described above (old batch terminated after new batch start) ?

Best regards,
Bartosz.


Can Spark Streaming checkpoint only metadata ?

2016-06-21 Thread Natu Lauchande
Hi,

I wonder if it is possible to checkpoint only metadata and not the data in
RDD's and dataframes.

Thanks,
Natu


Question about Spark Streaming checkpoint interval

2015-12-18 Thread Lan Jiang
Need some clarification about the documentation. According to Spark doc

"the default interval is a multiple of the batch interval that is at least 10 
seconds. It can be set by using dstream.checkpoint(checkpointInterval). 
Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a 
good setting to try.”

My question is that does the checkpointinterval apply only for data 
checkpointing or it applies to metadata checkpointing? The API says 
dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this 
DStream”, implying it is only for data checkpointing. My understanding is that 
metadata checkpointing is for driver failure. For example, in Kafka direct API, 
driver keeps track of the offset range of each partition. So if metadata 
checkpoint is NOT done for each batch, in driver failure, some messages in 
Kafka is going to be replayed. 

I do not find the answer in the document saying whether metadata checkpointing 
is done for each batch and whether checkpointinterval setting applies to both 
types of checkpointing. Maybe I miss it. If anyone can point me to the right 
documentation, I would highly appreciate it.

Best Regards,

Lan

Re: Question about Spark Streaming checkpoint interval

2015-12-18 Thread Shixiong Zhu
You are right. "checkpointInterval" is only for data checkpointing.
"metadata checkpoint" is done for each batch. Feel free to send a PR to add
the missing doc.

Best Regards,
Shixiong Zhu

2015-12-18 8:26 GMT-08:00 Lan Jiang :

> Need some clarification about the documentation. According to Spark doc
>
> *"the default interval is a multiple of the batch interval that is at
> least 10 seconds. It can be set by
> using dstream.checkpoint(checkpointInterval). Typically, a checkpoint
> interval of 5 - 10 sliding intervals of a DStream is a good setting to
> try.”*
>
> My question is that does the *checkpointinterval* apply only for *data
> checkpointing* or it applies to *metadata checkpointing*? The API says
> dstream.checkpoint() is for "Enable periodic checkpointing of RDDs of this
> DStream”, implying it is only for data checkpointing. My understanding is
> that metadata checkpointing is for driver failure. For example, in Kafka
> direct API, driver keeps track of the offset range of each partition. So if
> metadata checkpoint is NOT done for each batch, in driver failure, some
> messages in Kafka is going to be replayed.
>
> I do not find the answer in the document saying *whether metadata
> checkpointing is done for each batch* and whether checkpointinterval
> setting applies to both types of checkpointing. Maybe I miss it. If anyone
> can point me to the right documentation, I would highly appreciate it.
>
> Best Regards,
>
> Lan
>


Re: Spark Streaming Checkpoint help failed application

2015-11-11 Thread Gideon
Hi,

I'm no expert but

Short answer: yes, after restarting your application will reread the failed
messages

Longer answer: it seems like you're mixing several things together
Let me try and explain:
- WAL is used to prevent your application from losing data by making the
executor first write the data it receives from Kafka into WAL and only then
updating the Kafka high level consumer (what the receivers approach is
using) that it actually received the data (making it an at-least once)
- Checkpoints are a mechanism that helps your *driver* recover from failures
by saving driver information into HDFS (or S3 or whatever)

Now, the reason I explained these is this: you asked "... one bug caused the
streaming application to fail and exit" - so the failure you're trying to
solve is in the driver. When you restart your application your driver will
go and fetch the information it last saved in the checkpoint (saved into
HDFS) and order to new executors (since the previous driver died so did the
executors) to continue consuming data. Since your executors are using the
receivers approach (as opposed to the directkafkastream) with WAL what will
happen is that when they (the executors) get started they will first execute
what was saved in the WAL and then read from the latest offsets saved in
Kafka (Zookeeper) which in your case means you won't lose data (the
executors first save the data to WAL then advance their offsets on Kafka)

If you decide to go for the  direct approach
<https://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers>
  
then your driver will be the one (and only one) managing the offsets for
Kafka which means that some of the data the driver will save in the
checkpoint will be the Kafka offsets

I hope this helps :)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Checkpoint-help-failed-application-tp25347p25357.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming checkpoint against s3

2015-10-15 Thread Tian Zhang
So as long as jar is kept on s3 and available across different runs, then the
s3 checkpoint is working.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068p25081.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




Spark streaming checkpoint against s3

2015-10-14 Thread Tian Zhang
Hi, I am trying to set spark streaming checkpoint to s3, here is what I did
basically

val checkpoint = "s3://myBucket/checkpoint"
val ssc = StreamingContext.getOrCreate(checkpointDir,
   () =>
getStreamingContext(sparkJobName,

   
batchDurationSec),

  
classOf[MyClassKryoRegistrator],

  
checkpointDir),

  
getHadoopConfiguration) 
  
  def getHadoopConfiguration: Configuration = {
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", "s3://"+myBucket+"/")
hadoopConf.set("fs.s3.awsAccessKeyId", "myAccessKey")
hadoopConf.set("fs.s3.awsSecretAccessKey", "mySecretKey")
hadoopConf.set("fs.s3n.awsAccessKeyId", "myAccessKey")
hadoopConf.set("fs.s3n.awsSecretAccessKey", "mySecretKey
hadoopConf
   }

It is working as I can see that it tries to retrieve checkpoint from s3. 

However it did more than what I intended.  I saw in the log of the following
15/10/14 19:58:47 ERROR spark.SparkContext: Jar not found at
file:/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar

Now SparkContext is trying to look the following path instead of local

file:/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar

How do I let SparkContext to look just
/media/ephemeral0/oncue/mesos-slave/slaves/20151007-172900-436893194-5050-2984-S9/frameworks/20150825-180042-604730890-5050-4268-0003/executors/tian-act-reg.47368a1a-71f9-11e5-ad61-de5fb3a867da/runs/dfc28a6c-48a0-464b-bdb1-d6dd057acd51/artifacts/rna-spark-streaming.jar?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming checkpoint against s3

2015-10-14 Thread Tian Zhang
It looks like that reconstruction of SparkContext from checkpoint data is
trying to look for 
the jar file of previous failed runs.  It can not find the jar files as our
jar files are on local 
machines and were cleaned up after each failed run.








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-checkpoint-against-s3-tp25068p25070.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming checkpoint recovery throws Stack Overflow Error

2015-09-18 Thread Saisai Shao
Hi Swetha,

The problem of stack overflow is that when recovering from checkpoint data,
Java will use a recursive way to deserialize the call stack, if you have a
large call stack, this recursive way can easily lead to stack overflow.
This is caused by Java deserialization mechanism, you need to increase the
stack size.


Thanks
Saisai


On Fri, Sep 18, 2015 at 9:19 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Which version of Java are you using ?
>
> And release of Spark, please.
>
> Thanks
>
> On Fri, Sep 18, 2015 at 9:15 AM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> When I try to recover my Spark Streaming job from a checkpoint directory,
>> I
>> get a StackOverFlow Error as shown below. Any idea as to why this is
>> happening?
>>
>> 15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the
>> context, marking it as stopped
>> java.lang.StackOverflowError
>> at java.util.Date.getTimeImpl(Date.java:887)
>> at java.util.Date.getTime(Date.java:883)
>> at java.util.Calendar.setTime(Calendar.java:1106)
>> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955)
>> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948)
>> at java.text.DateFormat.format(DateFormat.java:298)
>> at java.text.Format.format(Format.java:157)
>> at
>> org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136)
>> at scala.Option.map(Option.scala:145)
>> at
>> org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>>
>> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>>
>> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Spark Streaming checkpoint recovery throws Stack Overflow Error

2015-09-18 Thread swetha
Hi,

When I try to recover my Spark Streaming job from a checkpoint directory, I
get a StackOverFlow Error as shown below. Any idea as to why this is
happening?

15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the
context, marking it as stopped
java.lang.StackOverflowError
at java.util.Date.getTimeImpl(Date.java:887)
at java.util.Date.getTime(Date.java:883)
at java.util.Calendar.setTime(Calendar.java:1106)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955)
at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948)
at java.text.DateFormat.format(DateFormat.java:298)
at java.text.Format.format(Format.java:157)
at
org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136)
at scala.Option.map(Option.scala:145)
at 
org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming checkpoint recovery throws Stack Overflow Error

2015-09-18 Thread Ted Yu
Which version of Java are you using ?

And release of Spark, please.

Thanks

On Fri, Sep 18, 2015 at 9:15 AM, swetha <swethakasire...@gmail.com> wrote:

> Hi,
>
> When I try to recover my Spark Streaming job from a checkpoint directory, I
> get a StackOverFlow Error as shown below. Any idea as to why this is
> happening?
>
> 15/09/18 09:02:20 ERROR streaming.StreamingContext: Error starting the
> context, marking it as stopped
> java.lang.StackOverflowError
> at java.util.Date.getTimeImpl(Date.java:887)
> at java.util.Date.getTime(Date.java:883)
> at java.util.Calendar.setTime(Calendar.java:1106)
> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:955)
> at java.text.SimpleDateFormat.format(SimpleDateFormat.java:948)
> at java.text.DateFormat.format(DateFormat.java:298)
> at java.text.Format.format(Format.java:157)
> at
> org.apache.spark.streaming.ui.UIUtils$.formatBatchTime(UIUtils.scala:113)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:137)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$makeScope$1.apply(DStream.scala:136)
> at scala.Option.map(Option.scala:145)
> at
> org.apache.spark.streaming.dstream.DStream.makeScope(DStream.scala:136)
> at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:394)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
>
> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
>
> org.apache.spark.streaming.dstream.StateDStream.compute(StateDStream.scala:67)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
> at
>
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-throws-Stack-Overflow-Error-tp24737.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Retrieving offsets from previous spark streaming checkpoint

2015-08-13 Thread Stephen Durfey
When deploying a spark streaming application I want to be able to retrieve
the lastest kafka offsets that were processed by the pipeline, and create
my kafka direct streams from those offsets. Because the checkpoint
directory isn't guaranteed to be compatible between job deployments, I
don't want to re-use the checkpoint directory from the previous job
deployment. I also don't want to have to re-process everything in my kafka
queues. Is there any way to retrieve this information from the checkpoint
directory, or has anyone else solved this problem already?

* I apologize if this is a duplicate message. I didn't see it go through
earlier today, and I didn't see it in the archive.


Re: Retrieving offsets from previous spark streaming checkpoint

2015-08-13 Thread Cody Koeninger
Access the offsets using HasOffsetRanges, save them in your datastore,
provide them as the fromOffsets argument when starting the stream.

See https://github.com/koeninger/kafka-exactly-once

On Thu, Aug 13, 2015 at 3:53 PM, Stephen Durfey sjdur...@gmail.com wrote:

 When deploying a spark streaming application I want to be able to
 retrieve the lastest kafka offsets that were processed by the pipeline, and
 create my kafka direct streams from those offsets. Because the checkpoint
 directory isn't guaranteed to be compatible between job deployments, I
 don't want to re-use the checkpoint directory from the previous job
 deployment. I also don't want to have to re-process everything in my kafka
 queues. Is there any way to retrieve this information from the checkpoint
 directory, or has anyone else solved this problem already?

 * I apologize if this is a duplicate message. I didn't see it go through
 earlier today, and I didn't see it in the archive.



broadcast variable and accumulators issue while spark streaming checkpoint recovery

2015-07-29 Thread Shushant Arora
Hi

I am using spark streaming 1.3 and using checkpointing.
But job is failing to recover from checkpoint on restart.

For broadcast variable it says :
1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
java.lang.ClassCastException: [B cannot be cast to
pkg.broadcastvariableclassname
at point where i call bcvariable.value() in map function.

 at  mapfunction..
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

2.For accumulator variable it says :
15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
ResultTask(1, 16)
java.util.NoSuchElementException: key not found: 2
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)

its descibed in
https://issues.apache.org/jira/browse/SPARK-5206

I can afford to reset the accumulator to 0 on stream restart . Is it
possible to have it working ?

Thanks


Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery

2015-07-29 Thread Tathagata Das
Rather than using accumulator directly, what you can do is something like
this to lazily create an accumulator and use it (will get lazily recreated
if driver restarts from checkpoint)


dstream.transform { rdd =
val accum = SingletonObject.getOrCreateAccumulator()   // single object
method to create an accumulator or get an already created one.
rdd.map { x =  /// use accum  }
}


On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 Hi

 I am using spark streaming 1.3 and using checkpointing.
 But job is failing to recover from checkpoint on restart.

 For broadcast variable it says :
 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
 java.lang.ClassCastException: [B cannot be cast to
 pkg.broadcastvariableclassname
 at point where i call bcvariable.value() in map function.

  at  mapfunction..
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 2.For accumulator variable it says :
 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
 ResultTask(1, 16)
 java.util.NoSuchElementException: key not found: 2
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at
 org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)

 its descibed in
 https://issues.apache.org/jira/browse/SPARK-5206

 I can afford to reset the accumulator to 0 on stream restart . Is it
 possible to have it working ?

 Thanks








Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery

2015-07-29 Thread Shushant Arora
1.How to do it in java?
2.Can broadcast objects also be created in same way  after checkpointing.
3.Is it safe If I disable checkpoint and write offsets at end of each batch
to hdfs in mycode  and somehow specify in my job to use this offset for
creating kafkastream at first time. How can I specify javasparkstreaming
context to use this offsets while creating kafkastream at first time only
and after that use from previous batch interval's offsets..

On Thu, Jul 30, 2015 at 2:49 AM, Tathagata Das t...@databricks.com wrote:

 Rather than using accumulator directly, what you can do is something like
 this to lazily create an accumulator and use it (will get lazily recreated
 if driver restarts from checkpoint)


 dstream.transform { rdd =
 val accum = SingletonObject.getOrCreateAccumulator()   // single
 object method to create an accumulator or get an already created one.
 rdd.map { x =  /// use accum  }
 }


 On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora shushantaror...@gmail.com
  wrote:

 Hi

 I am using spark streaming 1.3 and using checkpointing.
 But job is failing to recover from checkpoint on restart.

 For broadcast variable it says :
 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
 java.lang.ClassCastException: [B cannot be cast to
 pkg.broadcastvariableclassname
 at point where i call bcvariable.value() in map function.

  at  mapfunction..
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 2.For accumulator variable it says :
 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
 ResultTask(1, 16)
 java.util.NoSuchElementException: key not found: 2
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at
 org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)

 its descibed in
 https://issues.apache.org/jira/browse/SPARK-5206

 I can afford to reset the accumulator to 0 on stream restart . Is it
 possible to have it working ?

 Thanks









Re: broadcast variable and accumulators issue while spark streaming checkpoint recovery

2015-07-29 Thread Tathagata Das
1. Same way, using static fields in a class.
2. Yes, same way.
3. Yes, you can do that. To differentiate from first time v/s continue,
you have to build your own semantics. For example, if the location in HDFS
you are suppose to store the offsets does not have any data, that means its
probably running the first time. Otherwise it is continuing, and so read
offset and continue from there.

On Wed, Jul 29, 2015 at 9:38 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 1.How to do it in java?
 2.Can broadcast objects also be created in same way  after checkpointing.
 3.Is it safe If I disable checkpoint and write offsets at end of each
 batch to hdfs in mycode  and somehow specify in my job to use this offset
 for creating kafkastream at first time. How can I specify
 javasparkstreaming context to use this offsets while creating kafkastream
 at first time only and after that use from previous batch interval's
 offsets..

 On Thu, Jul 30, 2015 at 2:49 AM, Tathagata Das t...@databricks.com
 wrote:

 Rather than using accumulator directly, what you can do is something like
 this to lazily create an accumulator and use it (will get lazily recreated
 if driver restarts from checkpoint)


 dstream.transform { rdd =
 val accum = SingletonObject.getOrCreateAccumulator()   // single
 object method to create an accumulator or get an already created one.
 rdd.map { x =  /// use accum  }
 }


 On Wed, Jul 29, 2015 at 1:15 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 Hi

 I am using spark streaming 1.3 and using checkpointing.
 But job is failing to recover from checkpoint on restart.

 For broadcast variable it says :
 1.WARN TaskSetManager: Lost task 15.0 in stage 7.0 (TID 1269, hostIP):
 java.lang.ClassCastException: [B cannot be cast to
 pkg.broadcastvariableclassname
 at point where i call bcvariable.value() in map function.

  at  mapfunction..
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
 at
 org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$3$1.apply(JavaDStreamLike.scala:184)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:634)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745)

 2.For accumulator variable it says :
 15/07/29 19:23:12 ERROR DAGScheduler: Failed to update accumulators for
 ResultTask(1, 16)
 java.util.NoSuchElementException: key not found: 2
 at scala.collection.MapLike$class.default(MapLike.scala:228)
 at scala.collection.AbstractMap.default(Map.scala:58)
 at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:894)
 at
 org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:893)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at
 scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at
 org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:893)

 its descibed in
 https://issues.apache.org/jira/browse/SPARK-5206

 I can afford to reset the accumulator to 0 on stream restart . Is it
 possible to have it working ?

 Thanks










Re: spark streaming - checkpoint

2015-06-29 Thread ram kumar
on using yarn-cluster, it works good

On Mon, Jun 29, 2015 at 12:07 PM, ram kumar ramkumarro...@gmail.com wrote:

 SPARK_CLASSPATH=$CLASSPATH:/usr/hdp/2.2.0.0-2041/hadoop-mapreduce/*
 in spark-env.sh

 I think i am facing the same issue
 https://issues.apache.org/jira/browse/SPARK-6203



 On Mon, Jun 29, 2015 at 11:38 AM, ram kumar ramkumarro...@gmail.com
 wrote:

 I am using Spark 1.2.0.2.2.0.0-82 (git revision de12451) built for Hadoop
 2.6.0.2.2.0.0-2041

 1) SPARK_CLASSPATH not set
 2) spark.executor.extraClassPath not set

 should I upgrade my version to 1.3 and check

 On Sat, Jun 27, 2015 at 1:07 PM, Tathagata Das t...@databricks.com
 wrote:

 Do you have SPARK_CLASSPATH set in both cases? Before and after
 checkpoint? If yes, then you should not be using SPARK_CLASSPATH, it has
 been deprecated since Spark 1.0 because of its ambiguity.
 Also where do you have spark.executor.extraClassPath set? I dont see it
 in the spark-submit command.

 On Fri, Jun 26, 2015 at 6:05 AM, ram kumar ramkumarro...@gmail.com
 wrote:

 Hi,

 -

 JavaStreamingContext ssc = new JavaStreamingContext(conf, new
 Duration(1));
 ssc.checkpoint(checkPointDir);

 JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
 {
 public JavaStreamingContext create() {
 return createContext(checkPointDir, outputDirectory);
 }

 };
 JavaStreamingContext ssc =
 JavaStreamingContext.getOrCreate(checkPointDir, factory);

 

 *first time, i run this. It work fine.*

 *but, second time. it shows following error.*
 *i deleted the checkpoint path and then it works.*

 ---
 [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar
 --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick
 --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 2015-06-26 12:43:42,981 WARN  [main] util.NativeCodeLoader
 (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library
 for your platform... using builtin-java classes where applicable
 2015-06-26 12:43:44,246 WARN  [main] shortcircuit.DomainSocketFactory
 (DomainSocketFactory.java:init(116)) - The short-circuit local reads
 feature cannot be used because libhadoop cannot be loaded.

 This is deprecated in Spark 1.0+.

 Please instead use:
  - ./spark-submit with --driver-class-path to augment the driver
 classpath
  - spark.executor.extraClassPath to augment the executor classpath

 Exception in thread main org.apache.spark.SparkException: Found both
 spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320)
 at scala.Option.foreach(Option.scala:236)
 at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320)
 at org.apache.spark.SparkContext.init(SparkContext.scala:178)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118)
 at
 org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
 at
 org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
 at scala.Option.map(Option.scala:145)
 at
 org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561)
 at
 org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566)
 at
 org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
 at
 com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
 org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 [user@h7 ~]

 --

 *can anyone help me with it*


 *thanks*







Re: spark streaming - checkpoint

2015-06-29 Thread ram kumar
SPARK_CLASSPATH=$CLASSPATH:/usr/hdp/2.2.0.0-2041/hadoop-mapreduce/*
in spark-env.sh

I think i am facing the same issue
https://issues.apache.org/jira/browse/SPARK-6203


On Mon, Jun 29, 2015 at 11:38 AM, ram kumar ramkumarro...@gmail.com wrote:

 I am using Spark 1.2.0.2.2.0.0-82 (git revision de12451) built for Hadoop
 2.6.0.2.2.0.0-2041

 1) SPARK_CLASSPATH not set
 2) spark.executor.extraClassPath not set

 should I upgrade my version to 1.3 and check

 On Sat, Jun 27, 2015 at 1:07 PM, Tathagata Das t...@databricks.com
 wrote:

 Do you have SPARK_CLASSPATH set in both cases? Before and after
 checkpoint? If yes, then you should not be using SPARK_CLASSPATH, it has
 been deprecated since Spark 1.0 because of its ambiguity.
 Also where do you have spark.executor.extraClassPath set? I dont see it
 in the spark-submit command.

 On Fri, Jun 26, 2015 at 6:05 AM, ram kumar ramkumarro...@gmail.com
 wrote:

 Hi,

 -

 JavaStreamingContext ssc = new JavaStreamingContext(conf, new
 Duration(1));
 ssc.checkpoint(checkPointDir);

 JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
 public JavaStreamingContext create() {
 return createContext(checkPointDir, outputDirectory);
 }

 };
 JavaStreamingContext ssc =
 JavaStreamingContext.getOrCreate(checkPointDir, factory);

 

 *first time, i run this. It work fine.*

 *but, second time. it shows following error.*
 *i deleted the checkpoint path and then it works.*

 ---
 [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar
 --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick
 --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 2015-06-26 12:43:42,981 WARN  [main] util.NativeCodeLoader
 (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library
 for your platform... using builtin-java classes where applicable
 2015-06-26 12:43:44,246 WARN  [main] shortcircuit.DomainSocketFactory
 (DomainSocketFactory.java:init(116)) - The short-circuit local reads
 feature cannot be used because libhadoop cannot be loaded.

 This is deprecated in Spark 1.0+.

 Please instead use:
  - ./spark-submit with --driver-class-path to augment the driver
 classpath
  - spark.executor.extraClassPath to augment the executor classpath

 Exception in thread main org.apache.spark.SparkException: Found both
 spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320)
 at scala.Option.foreach(Option.scala:236)
 at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320)
 at org.apache.spark.SparkContext.init(SparkContext.scala:178)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118)
 at
 org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
 at
 org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
 at scala.Option.map(Option.scala:145)
 at
 org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561)
 at
 org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566)
 at
 org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
 at
 com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 [user@h7 ~]

 --

 *can anyone help me with it*


 *thanks*






Re: spark streaming - checkpoint

2015-06-27 Thread Tathagata Das
Do you have SPARK_CLASSPATH set in both cases? Before and after checkpoint?
If yes, then you should not be using SPARK_CLASSPATH, it has been
deprecated since Spark 1.0 because of its ambiguity.
Also where do you have spark.executor.extraClassPath set? I dont see it in
the spark-submit command.

On Fri, Jun 26, 2015 at 6:05 AM, ram kumar ramkumarro...@gmail.com wrote:

 Hi,

 -

 JavaStreamingContext ssc = new JavaStreamingContext(conf, new
 Duration(1));
 ssc.checkpoint(checkPointDir);

 JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
 public JavaStreamingContext create() {
 return createContext(checkPointDir, outputDirectory);
 }

 };
 JavaStreamingContext ssc =
 JavaStreamingContext.getOrCreate(checkPointDir, factory);

 

 *first time, i run this. It work fine.*

 *but, second time. it shows following error.*
 *i deleted the checkpoint path and then it works.*

 ---
 [user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar
 --conf spark.driver.allowMultipleContexts=true --class com.spark.Pick
 --master yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar
 Spark assembly has been built with Hive, including Datanucleus jars on
 classpath
 2015-06-26 12:43:42,981 WARN  [main] util.NativeCodeLoader
 (NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library
 for your platform... using builtin-java classes where applicable
 2015-06-26 12:43:44,246 WARN  [main] shortcircuit.DomainSocketFactory
 (DomainSocketFactory.java:init(116)) - The short-circuit local reads
 feature cannot be used because libhadoop cannot be loaded.

 This is deprecated in Spark 1.0+.

 Please instead use:
  - ./spark-submit with --driver-class-path to augment the driver classpath
  - spark.executor.extraClassPath to augment the executor classpath

 Exception in thread main org.apache.spark.SparkException: Found both
 spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332)
 at
 org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320)
 at scala.Option.foreach(Option.scala:236)
 at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320)
 at org.apache.spark.SparkContext.init(SparkContext.scala:178)
 at
 org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118)
 at
 org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
 at
 org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
 at scala.Option.map(Option.scala:145)
 at
 org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561)
 at
 org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566)
 at
 org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
 at
 com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360)
 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 [user@h7 ~]

 --

 *can anyone help me with it*


 *thanks*



spark streaming - checkpoint

2015-06-26 Thread ram kumar
Hi,

-

JavaStreamingContext ssc = new JavaStreamingContext(conf, new
Duration(1));
ssc.checkpoint(checkPointDir);

JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
return createContext(checkPointDir, outputDirectory);
}

};
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkPointDir, factory);



*first time, i run this. It work fine.*

*but, second time. it shows following error.*
*i deleted the checkpoint path and then it works.*

---
[user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar  --conf
spark.driver.allowMultipleContexts=true --class com.spark.Pick --master
yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
2015-06-26 12:43:42,981 WARN  [main] util.NativeCodeLoader
(NativeCodeLoader.java:clinit(62)) - Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
2015-06-26 12:43:44,246 WARN  [main] shortcircuit.DomainSocketFactory
(DomainSocketFactory.java:init(116)) - The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.

This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

Exception in thread main org.apache.spark.SparkException: Found both
spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334)
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332)
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320)
at org.apache.spark.SparkContext.init(SparkContext.scala:178)
at
org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:118)
at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561)
at
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566)
at
org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at
com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[user@h7 ~]

--

*can anyone help me with it*


*thanks*


Re: Spark Streaming checkpoint recovery causes IO re-execution

2015-01-20 Thread RodrigoB
Hi Hannes,

Good to know I'm not alone on the boat. Sorry about not posting back, I
haven't gone in a while onto the user list. 

It's on my agenda to get over this issue. Will be very important for our
recovery implementation. I have done an internal proof of concept but
without any conclusions so far. 

The main approach is to have full control over offsets, meaning upon each
processed batch we will need to persist the last processed event (I'm using
Kafka btw) and keep the offset somewhere, so that upon recovery we only
start the streaming from the last processed one. This kind of goes in
conflict with the new ReliableReceiver implementation, where that control is
taken away from the processing layer... 
When recovering Spark Streaming, we need to control the recovered batches so
that only internal state gets updated and no IO gets executed. For this we
need to make internal changes to Spark Streaming

I exposed a function that identifies how many batches are being recovered.
Then I passed that info upfront to the workers, and with a counter they are
aware of how many batches were recomputed, thus avoiding IO re-execution.
This is very much in embryo stage so I can't actually help you much at this
stage...
This is the function I've created inside JobGenerator class to access the
recovered batches:

def getDownTimes() : Seq[Time] =
  {
println(123)
if (ssc.isCheckpointPresent) {
  val batchDuration = ssc.graph.batchDuration

  // Batches when the master was down, that is,
  // between the checkpoint and current restart time
  val checkpointTime = ssc.initialCheckpoint.checkpointTime
  val restartTime = new
Time(timer.getRestartTime(graph.zeroTime.milliseconds))
  val downTimes = checkpointTime.until(restartTime, batchDuration)
  logInfo(Batches during down time ( + downTimes.size +  batches): 
+ downTimes.mkString(, ))

  downTimes
}
else
  Seq[Time]()
  }

Has been a while since I last visited this issue so I'm probably not able to
give you too many details right now, but I expect to have a concrete
solution on which ultimately I could push as proposal to the Spark dev team.

I will definitely notify people on this thread at least.

Tnks,
Rod




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p21265.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-31 Thread RodrigoB
Hi Yana,

You are correct. What needs to be added is that besides RDDs being
checkpointed, metadata which represents execution of computations are also
checkpointed in Spark Streaming.

Upon driver recovery, the last batches (the ones already executed and the
ones that should have been executed while shut down) are recomputed. This is
very good if we just want to recover state and if we don't have any other
component or data store depending on Spark's output. 
In the case we do have that requirement (which is my case) all the nodes
will re-execute all that IO provoking overall system inconsistency as the
outside system were not expecting events from the past.

We need some way of making Spark aware of which computations are
recomputations and which are not so we can empower Spark developers to
introduce specific logic if they need to.

Let me know if any of this doesn't make sense.

tnks,
Rod 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13205.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-29 Thread Yana Kadiyska
I understand that the DB writes are happening from the workers unless you
collect. My confusion is that you believe workers recompute on recovery(nodes
computations which get redone upon recovery). My understanding is that
checkpointing dumps  the RDD to disk and the cuts the RDD lineage. So I
thought on driver restart you'll get a set of new executor processes but
they would read the last known state of the RDD from HDFS checkpoint. Am I
off here?

So the only situation I can imagine where you end up recomputing is if your
checkpointing at a larger interval than your batch size (i.e. the RDD on
disk does not reflect it's last precrash state)?


On Thu, Aug 28, 2014 at 1:32 PM, RodrigoB rodrigo.boav...@aspect.com
wrote:

 Hi Yana,

 The fact is that the DB writing is happening on the node level and not on
 Spark level. One of the benefits of distributed computing nature of Spark
 is
 enabling IO distribution as well. For example, is much faster to have the
 nodes to write to Cassandra instead of having them all collected at the
 driver level and sending the writes from there.

 The problem is that nodes computations which get redone upon recovery. If
 these lambda functions send events to other systems these events would get
 resent upon re-computation causing overall system instability.

 Hope this helps you understand the problematic.

 tnks,
 Rod



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-28 Thread GADV
Not sure if this make sense, but maybe would be nice to have a kind of flag
available within the code that tells me if I'm running in a normal
situation or during a recovery.
To better explain this, let's consider the following scenario:
I am processing data, let's say from a Kafka streaming, and I am updating a
database based on the computations. During the recovery I don't want to
update again the database (for many reasons, let's just assume that) but I
want my system to be in the same status as before, thus I would like to know
if my code is running for the first time or during a recovery so I can avoid
to update the database again.
More generally I want to know this in case I'm interacting with external
entities.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13009.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-28 Thread Yana Kadiyska
Can you clarify the scenario:

 val ssc = new StreamingContext(sparkConf, Seconds(10))
 ssc.checkpoint(checkpointDirectory)

 val stream = KafkaUtils.createStream(...)
 val wordCounts = lines.flatMap(_.split( )).map(x = (x, 1L))

 val wordDstream= wordCounts.updateStateByKey[Int](updateFunc)

wordDstream.foreachRDD(

   rdd=rdd.foreachPartition( //sync state to external store//)
)

​

My impression is that during recovery from checkpoint, your wordDstream
would be in the state that it was before the crash +1 batch interval
forward when you get to the foreachRDD part -- even if re-creating the
pre-crash RDD is really slow. So if your driver goes down at 10:20 and you
restart at 10:30, I thought at the time of the DB write wordDstream would
have exactly the state of 10:20 + 10seconds worth of aggregated stream data?


I don't really understand what you mean by Upon metadata checkpoint
recovery (before the data checkpoint occurs) but it sounds like you're
observing the same DB write happening twice?

I don't have any advice for you but I am interested in understanding better
what happens in the recovery scenario so just trying to clarify what you
observe.


On Thu, Aug 28, 2014 at 6:42 AM, GADV giulio_devec...@yahoo.com wrote:

 Not sure if this make sense, but maybe would be nice to have a kind of
 flag
 available within the code that tells me if I'm running in a normal
 situation or during a recovery.
 To better explain this, let's consider the following scenario:
 I am processing data, let's say from a Kafka streaming, and I am updating a
 database based on the computations. During the recovery I don't want to
 update again the database (for many reasons, let's just assume that) but I
 want my system to be in the same status as before, thus I would like to
 know
 if my code is running for the first time or during a recovery so I can
 avoid
 to update the database again.
 More generally I want to know this in case I'm interacting with external
 entities.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13009.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Spark Streaming checkpoint recovery causes IO re-execution

2014-08-28 Thread RodrigoB
Hi Yana,

The fact is that the DB writing is happening on the node level and not on
Spark level. One of the benefits of distributed computing nature of Spark is
enabling IO distribution as well. For example, is much faster to have the
nodes to write to Cassandra instead of having them all collected at the
driver level and sending the writes from there.

The problem is that nodes computations which get redone upon recovery. If
these lambda functions send events to other systems these events would get
resent upon re-computation causing overall system instability.

Hope this helps you understand the problematic.

tnks,
Rod 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-checkpoint-recovery-causes-IO-re-execution-tp12568p13043.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming - What does Spark Streaming checkpoint?

2014-08-21 Thread Chris Fregly
The StreamingContext can be recreated from a checkpoint file, indeed.

check out the following Spark Streaming source files for details:
 StreamingContext, Checkpoint, DStream, DStreamCheckpoint, and DStreamGraph.




On Wed, Jul 9, 2014 at 6:11 PM, Yan Fang yanfang...@gmail.com wrote:

 Hi guys,

 I am a little confusing by the checkpointing in Spark Streaming. It
 checkpoints the intermediate data for the stateful operations for sure.
 Does it also checkpoint the information of StreamingContext? Because it
 seems we can recreate the SC from the checkpoint in a driver node failure
 scenario. When I looked at the checkpoint directory, did not find much
 clue. Any help? Thank you very much.

 Best,

 Fang, Yan
 yanfang...@gmail.com
 +1 (206) 849-4108



Spark Streaming - What does Spark Streaming checkpoint?

2014-07-09 Thread Yan Fang
Hi guys,

I am a little confusing by the checkpointing in Spark Streaming. It
checkpoints the intermediate data for the stateful operations for sure.
Does it also checkpoint the information of StreamingContext? Because it
seems we can recreate the SC from the checkpoint in a driver node failure
scenario. When I looked at the checkpoint directory, did not find much
clue. Any help? Thank you very much.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108