Re: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-06 Thread ALunar Beach
Thanks TD.
In pre-structured streaming, exactly once guarantee on input is not
guaranteed. is it?

On Tue, Jun 6, 2017 at 4:30 AM, Tathagata Das 
wrote:

> This is the expected behavior. There are some confusing corner cases.
> If you are starting to play with Spark Streaming, i highly recommend
> learning Structured Streaming
> 
> instead.
>
> On Mon, Jun 5, 2017 at 11:16 AM, anbucheeralan 
> wrote:
>
>> I am using Spark Streaming Checkpoint and Kafka Direct Stream.
>> It uses a 30 sec batch duration and normally the job is successful in
>> 15-20 sec.
>>
>> If the spark application fails after the successful completion
>> (149668428ms in the log below) and restarts, it's duplicating the last
>> batch again.
>>
>> Is this the expected behavior? I was expecting this to start a new batch
>> window.
>>
>>
>> Here are some logs:
>>
>> Last successful run:
>> 17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
>> 149668428 ms (execution: 0.029 s)
>> 17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
>> 17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
>> 17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
>> 149668428 ms to writer queue
>> 17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
>> 149668428 ms to file 'file:/Users/anbucheeralan/Ide
>> aProjects/Spark2Example/ckpt/checkpoint-149668428'
>> 17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time
>> 149668428 ms saved to file
>> 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
>> took 4032 bytes and 9 ms*
>> 17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
>> 149668428 ms
>> 17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
>> 149668428 ms
>>
>> After the restart,
>>
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream$Direct
>> KafkaInputDStreamCheckpointData: Restoring KafkaRDD for time
>> 149668428 ms [(my_test,0,2000,2000)]
>> 17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
>> *17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
>> batches): 149668428 ms, 149668431 ms, 149668434 ms,
>> 149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
>> 149668449 ms, 149668452 ms, 149668455 ms*
>> *17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
>> batches): *
>> *17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches):
>> *149668428 ms, 149668431 ms, 149668434 ms, 149668437 ms,
>> 149668440 ms, 149668443 ms, 149668446 ms, 149668449 ms,
>> 149668452 ms, 149668455 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
>> 17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
>> 149668428 ms.0 from job set of time 149668428 ms
>>
>>
>>
>> --
>> View this message in context: Fwd: Spark Streaming Checkpoint and
>> Exactly Once Guarantee on Kafka Direct Stream
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

2017-06-05 Thread ALunar Beach
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20
sec.

If the spark application fails after the successful completion
(149668428ms in the log below) and restarts, it's duplicating the last
batch again.

Is this the expected behavior? I was expecting this to start a new batch
window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time
149668428 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time
149668428 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time
149668428 ms to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428'
17/06/05 13:38:00 INFO CheckpointWriter: *Checkpoint for time 149668428
ms saved to file
'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-149668428',
took 4032 bytes and 9 ms*
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time
149668428 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time
149668428 ms

After the restart,

17/06/05 13:42:31 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 149668428 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
*17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10
batches): 149668428 ms, 149668431 ms, 149668434 ms,
149668437 ms, 149668440 ms, 149668443 ms, 149668446 ms,
149668449 ms, 149668452 ms, 149668455 ms*
*17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0
batches): *
*17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10
batches): *149668428
ms, 149668431 ms, 149668434 ms, 149668437 ms, 149668440 ms,
149668443 ms, 149668446 ms, 149668449 ms, 149668452 ms,
149668455 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 149668428 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job
149668428 ms.0 from job set of time 149668428 ms