Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Akhil Das
This consumer pretty much covers all those scenarios you listed
github.com/dibbhatt/kafka-spark-consumer Give it a try.

Thanks
Best Regards

On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki 
wrote:

> Hi there,
> I have a problem with fulfilling all my needs when using Spark Streaming
> on Kafka. Let me enumerate my requirements:
> 1. I want to have at-least-once/exactly-once processing.
> 2. I want to have my application fault & simple stop tolerant. The Kafka
> offsets need to be tracked between restarts.
> 3. I want to be able to upgrade code of my application without losing
> Kafka offsets.
>
> Now what my requirements imply according to my knowledge:
> 1. implies using new Kafka DirectStream.
> 2. implies  using checkpointing. kafka DirectStream will write offsets to
> the checkpoint as well.
> 3. implies that checkpoints can't be used between controlled restarts. So
> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is
> a description how:
> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
> )
>
> Now my problems are:
> 1. If I cannot make checkpoints between code upgrades, does it mean that
> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
> that I have to implement my own storing to/initalization of offsets from
> Zookeeper?
> 2. When I set up shutdownHook and my any executor throws an exception, it
> seems that application does not fail, but stuck in running state. Is that
> because stopGracefully deadlocks on exceptions? How to overcome this
> problem? Maybe I can avoid setting shutdownHook and there is other way to
> stop gracefully your app?
>
> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
> to be able to upgrade code & not lose Kafka offsets?
>
>
> Thank you a lot for your answers,
> Krzysztof Zarzycki
>
>
>
>


Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Hi there,
I have a problem with fulfilling all my needs when using Spark Streaming on
Kafka. Let me enumerate my requirements:
1. I want to have at-least-once/exactly-once processing.
2. I want to have my application fault & simple stop tolerant. The Kafka
offsets need to be tracked between restarts.
3. I want to be able to upgrade code of my application without losing Kafka
offsets.

Now what my requirements imply according to my knowledge:
1. implies using new Kafka DirectStream.
2. implies  using checkpointing. kafka DirectStream will write offsets to
the checkpoint as well.
3. implies that checkpoints can't be used between controlled restarts. So I
need to install shutdownHook with ssc.stop(stopGracefully=true) (here is a
description how:
https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
)

Now my problems are:
1. If I cannot make checkpoints between code upgrades, does it mean that
Spark does not help me at all with keeping my Kafka offsets? Does it mean,
that I have to implement my own storing to/initalization of offsets from
Zookeeper?
2. When I set up shutdownHook and my any executor throws an exception, it
seems that application does not fail, but stuck in running state. Is that
because stopGracefully deadlocks on exceptions? How to overcome this
problem? Maybe I can avoid setting shutdownHook and there is other way to
stop gracefully your app?

3. If I somehow overcome 2., is it enough to just stop gracefully my app to
be able to upgrade code & not lose Kafka offsets?


Thank you a lot for your answers,
Krzysztof Zarzycki


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dmitry Goldenberg
>> checkpoints can't be used between controlled restarts

Is that true? If so, why? From my testing, checkpoints appear to be working
fine, we get the data we've missed between the time the consumer went down
and the time we brought it back up.

>> If I cannot make checkpoints between code upgrades, does it mean that
Spark does not help me at all with keeping my Kafka offsets? Does it mean,
that I have to implement my own storing to/initalization of offsets from
Zookeeper?

By code upgrades, are code changes to the consumer program meant?

If that is the case, one idea we've been entertaining is that, if the
consumer changes, especially if its configuration parameters change, it
means that some older configuration may still be stuck in the
checkpointing.  What we'd do in this case is, prior to starting the
consumer, blow away the checkpointing directory and re-consume from Kafka
from the smallest offsets.  In our case, it's OK to re-process; I realize
that in many cases that may not be an option.  If that's the case then it
would seem to follow that you have to manage offsets in Zk...

Another thing to consider would be to treat upgrades operationally. In
that, if an upgrade is to happen, consume the data up to a certain point
then bring the system down for an upgrade. Remove checkpointing. Restart
everything; the system would now be rebuilding the checkpointing and using
your upgraded consumers.  (Again, this may not be possible in some systems
where the data influx is constant and/or the data is mission critical)...

Perhaps this discussion implies that there may be a new feature in Spark
where it intelligently drops the checkpointing or allows you to selectively
pluck out and drop some items prior to restarting...




On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das 
wrote:

> This consumer pretty much covers all those scenarios you listed
> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>
> Thanks
> Best Regards
>
> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki 
> wrote:
>
>> Hi there,
>> I have a problem with fulfilling all my needs when using Spark Streaming
>> on Kafka. Let me enumerate my requirements:
>> 1. I want to have at-least-once/exactly-once processing.
>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>> offsets need to be tracked between restarts.
>> 3. I want to be able to upgrade code of my application without losing
>> Kafka offsets.
>>
>> Now what my requirements imply according to my knowledge:
>> 1. implies using new Kafka DirectStream.
>> 2. implies  using checkpointing. kafka DirectStream will write offsets to
>> the checkpoint as well.
>> 3. implies that checkpoints can't be used between controlled restarts. So
>> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is
>> a description how:
>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>> )
>>
>> Now my problems are:
>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>> that I have to implement my own storing to/initalization of offsets from
>> Zookeeper?
>> 2. When I set up shutdownHook and my any executor throws an exception, it
>> seems that application does not fail, but stuck in running state. Is that
>> because stopGracefully deadlocks on exceptions? How to overcome this
>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>> stop gracefully your app?
>>
>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>> to be able to upgrade code & not lose Kafka offsets?
>>
>>
>> Thank you a lot for your answers,
>> Krzysztof Zarzycki
>>
>>
>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Thanks Akhil, seems like an interesting option to consider.
Do you know if the package is production-ready? Do you use it in production?

And do you know if it works for Spark 1.3.1 as well? README mentions that
package in spark-packages.org is built with Spark 1.4.1.


Anyway, it  seems that core Spark Streaming does not support my case? Or
anyone can instruct me on how to do it? Let's say, that I'm even fine (but
not content about) with using KafkaCluster private class that is included
in Spark, for manual managing ZK offsets. Has someone done it before? Has
someone public code examples of manually managing ZK offsets?

Thanks,
Krzysztof

2015-09-10 12:22 GMT+02:00 Akhil Das :

> This consumer pretty much covers all those scenarios you listed
> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>
> Thanks
> Best Regards
>
> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki 
> wrote:
>
>> Hi there,
>> I have a problem with fulfilling all my needs when using Spark Streaming
>> on Kafka. Let me enumerate my requirements:
>> 1. I want to have at-least-once/exactly-once processing.
>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>> offsets need to be tracked between restarts.
>> 3. I want to be able to upgrade code of my application without losing
>> Kafka offsets.
>>
>> Now what my requirements imply according to my knowledge:
>> 1. implies using new Kafka DirectStream.
>> 2. implies  using checkpointing. kafka DirectStream will write offsets to
>> the checkpoint as well.
>> 3. implies that checkpoints can't be used between controlled restarts. So
>> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is
>> a description how:
>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>> )
>>
>> Now my problems are:
>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>> that I have to implement my own storing to/initalization of offsets from
>> Zookeeper?
>> 2. When I set up shutdownHook and my any executor throws an exception, it
>> seems that application does not fail, but stuck in running state. Is that
>> because stopGracefully deadlocks on exceptions? How to overcome this
>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>> stop gracefully your app?
>>
>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>> to be able to upgrade code & not lose Kafka offsets?
>>
>>
>> Thank you a lot for your answers,
>> Krzysztof Zarzycki
>>
>>
>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

This is being running in Production in many organization who has adopted
this consumer as an alternative option.  The Consumer will run with spark
1.3.1 .

This is being running in Pearson for sometime in production.

This is part of spark packages and you can see how to include it in your
mvn or sbt .

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

As this consumer comes with in-built PID controller to control
back-pressure which you can use even if you are using Spark 1.3.1


Regards,
Dibyendu


On Thu, Sep 10, 2015 at 5:48 PM, Krzysztof Zarzycki 
wrote:

> Thanks Akhil, seems like an interesting option to consider.
> Do you know if the package is production-ready? Do you use it in
> production?
>
> And do you know if it works for Spark 1.3.1 as well? README mentions that
> package in spark-packages.org is built with Spark 1.4.1.
>
>
> Anyway, it  seems that core Spark Streaming does not support my case? Or
> anyone can instruct me on how to do it? Let's say, that I'm even fine (but
> not content about) with using KafkaCluster private class that is included
> in Spark, for manual managing ZK offsets. Has someone done it before? Has
> someone public code examples of manually managing ZK offsets?
>
> Thanks,
> Krzysztof
>
> 2015-09-10 12:22 GMT+02:00 Akhil Das :
>
>> This consumer pretty much covers all those scenarios you listed
>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki > > wrote:
>>
>>> Hi there,
>>> I have a problem with fulfilling all my needs when using Spark Streaming
>>> on Kafka. Let me enumerate my requirements:
>>> 1. I want to have at-least-once/exactly-once processing.
>>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>>> offsets need to be tracked between restarts.
>>> 3. I want to be able to upgrade code of my application without losing
>>> Kafka offsets.
>>>
>>> Now what my requirements imply according to my knowledge:
>>> 1. implies using new Kafka DirectStream.
>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>> to the checkpoint as well.
>>> 3. implies that checkpoints can't be used between controlled restarts.
>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here
>>> is a description how:
>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>> )
>>>
>>> Now my problems are:
>>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>>> that I have to implement my own storing to/initalization of offsets from
>>> Zookeeper?
>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>> it seems that application does not fail, but stuck in running state. Is
>>> that because stopGracefully deadlocks on exceptions? How to overcome this
>>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>>> stop gracefully your app?
>>>
>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>>> to be able to upgrade code & not lose Kafka offsets?
>>>
>>>
>>> Thank you a lot for your answers,
>>> Krzysztof Zarzycki
>>>
>>>
>>>
>>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Cody Koeninger
The kafka direct stream meets those requirements.  You don't need
checkpointing for exactly-once.  Indeed, unless your output operations are
idempotent, you can't get exactly-once if you're relying on checkpointing.
Instead, you need to store the offsets atomically in the same transaction
as your results.

See
https://github.com/koeninger/kafka-exactly-once
and the video / blog posts linked from it.

The dibhatt consumer that Akhil linked is using zookeeper to store offsets,
so to the best of my knowledge, it cannot do exactly-once without
idempotent output operations.

Regarding the issues around code changes and checkpointing, the most
straightforward way to deal with this is to just start a new version of
your job before stopping the old one.  If you care about delivery semantics
and are using checkpointing, your output operation must be idempotent
anyway, so having 2 versions of the code running at the same time for a
brief period should not be a problem.



On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg  wrote:

> >> checkpoints can't be used between controlled restarts
>
> Is that true? If so, why? From my testing, checkpoints appear to be
> working fine, we get the data we've missed between the time the consumer
> went down and the time we brought it back up.
>
> >> If I cannot make checkpoints between code upgrades, does it mean that
> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
> that I have to implement my own storing to/initalization of offsets from
> Zookeeper?
>
> By code upgrades, are code changes to the consumer program meant?
>
> If that is the case, one idea we've been entertaining is that, if the
> consumer changes, especially if its configuration parameters change, it
> means that some older configuration may still be stuck in the
> checkpointing.  What we'd do in this case is, prior to starting the
> consumer, blow away the checkpointing directory and re-consume from Kafka
> from the smallest offsets.  In our case, it's OK to re-process; I realize
> that in many cases that may not be an option.  If that's the case then it
> would seem to follow that you have to manage offsets in Zk...
>
> Another thing to consider would be to treat upgrades operationally. In
> that, if an upgrade is to happen, consume the data up to a certain point
> then bring the system down for an upgrade. Remove checkpointing. Restart
> everything; the system would now be rebuilding the checkpointing and using
> your upgraded consumers.  (Again, this may not be possible in some systems
> where the data influx is constant and/or the data is mission critical)...
>
> Perhaps this discussion implies that there may be a new feature in Spark
> where it intelligently drops the checkpointing or allows you to selectively
> pluck out and drop some items prior to restarting...
>
>
>
>
> On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das 
> wrote:
>
>> This consumer pretty much covers all those scenarios you listed
>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>
>> Thanks
>> Best Regards
>>
>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki > > wrote:
>>
>>> Hi there,
>>> I have a problem with fulfilling all my needs when using Spark Streaming
>>> on Kafka. Let me enumerate my requirements:
>>> 1. I want to have at-least-once/exactly-once processing.
>>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>>> offsets need to be tracked between restarts.
>>> 3. I want to be able to upgrade code of my application without losing
>>> Kafka offsets.
>>>
>>> Now what my requirements imply according to my knowledge:
>>> 1. implies using new Kafka DirectStream.
>>> 2. implies  using checkpointing. kafka DirectStream will write offsets
>>> to the checkpoint as well.
>>> 3. implies that checkpoints can't be used between controlled restarts.
>>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here
>>> is a description how:
>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>> )
>>>
>>> Now my problems are:
>>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>>> that I have to implement my own storing to/initalization of offsets from
>>> Zookeeper?
>>> 2. When I set up shutdownHook and my any executor throws an exception,
>>> it seems that application does not fail, but stuck in running state. Is
>>> that because stopGracefully deadlocks on exceptions? How to overcome this
>>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>>> stop gracefully your app?
>>>
>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>>> to be able to upgrade code & not lose Kafka offsets?
>>>
>>>
>>> Thank you a lot for your answers,
>>> Krzysztof Zarzycki
>>>
>>>
>>>
>>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Cody Koeninger
You have to store offsets somewhere.

If you're going to store them in checkpoints, then you have to deal with
the fact that checkpoints aren't recoverable on code change.  Starting up
the new version helps because you don't start it from the same checkpoint
directory as the running one... it has your new code, and is storing to a
new checkpoint directory.  If you started the new one from the latest
offsets, you can shut down the old one as soon as it's caught up.

If you don't like the implications of storing offsets in checkpoints...
then sure, store them yourself.  A real database would be better, but if
you really want to store them in zookeeper you can.  In any case, just do
your offset saves in the same foreachPartition your other output operations
are occurring in, after they've successfully completed.

If you don't care about performance benefits of the direct stream and don't
want exactly once semantics, sure use the old stream.

Finally, hundreds of gigs just really isn't very much data.  Unless what
you're doing is really resource intensive, it shouldn't take much time to
process it all, especially if you can dynamically size a cluster for the
rare occasion that something is screwed up and you need to reprocess.


On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki 
wrote:

> Thanks guys for your answers. I put my answers in text, below.
>
> Cheers,
> Krzysztof Zarzycki
>
> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>
>> The kafka direct stream meets those requirements.  You don't need
>> checkpointing for exactly-once.  Indeed, unless your output operations are
>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>> Instead, you need to store the offsets atomically in the same transaction
>> as your results.
>>
>
> To focus discussion, let's assume my operations are idempotent & I'm
> interested in at-least-once thanks to that (which is idempotent
> exactly-once as named in your pres). Did you say, that I don't need
> checkpointing for that? How then direct stream API would store offsets
>  between restarts?
>
>
>> See
>> https://github.com/koeninger/kafka-exactly-once
>> and the video / blog posts linked from it.
>>
>>
> I did that, thank you. What I want is to achieve "idempotent exactly-once"
> as named in your presentation.
>
>
>> The dibhatt consumer that Akhil linked is using zookeeper to store
>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>> idempotent output operations.
>>
> True, and I totally accept it if what I get is at-least-once.
>
>
>>
>>
> Regarding the issues around code changes and checkpointing, the most
>> straightforward way to deal with this is to just start a new version of
>> your job before stopping the old one.  If you care about delivery semantics
>> and are using checkpointing, your output operation must be idempotent
>> anyway, so having 2 versions of the code running at the same time for a
>> brief period should not be a problem.
>>
>
> How starting new version before stopping old one helps? Could you please
> explain a bit the mechanics of that?
> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
> situations when it will be just inapropriate to run old one, when, let's
> say, we discovered a bug and don't want to run it anymore.
>
>
> So... To sum up it correctly, if I want at-least-once, with simple code
> upgrades,  I need to:
> -  store offsets in external storage (I would choose ZK for that).
> -  read them on application restart and pass the TopicAndPartition->offset
> map to createDirectStream.
> -  And I don't need to use checkpoints at all then.
> Could you confirm that?
>
> It's a question where should I actually commit the ZK offsets. The easiest
> would be to do it on the end of every batch. Do you think I can use
> org.apache.spark.streaming.scheduler.StreamingListener, method
> onBatchCompleted for that? I don't think so, because probably we don't have
> access to finieshed offsets in it...
> So maybe each executor can commit the offsets?
>
> Alternative to that solution I just realized is to stay with old Kafka
> receiver (createStream API) and just enable Write Ahead Logs. This way, we
> don't lose any data on application kill, so have "idempotent exactly-once"
> semantics, offsets are stored in ZK for us, don't need to use
> checkpoints... Seems like viable option! Do you agree?
>
>
>
>
>
>
>>
>>
>>
>> On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> >> checkpoints can't be used between controlled restarts
>>>
>>> Is that true? If so, why? From my testing, checkpoints appear to be
>>> working fine, we get the data we've missed between the time the consumer
>>> went down and the time we brought it back up.
>>>
>> I'm sorry I simplified the case. I meant "checkpoints can't be used
> between controlled restarts if you want to upgrade code in between".
>
>>
>>> >> If I cannot make 

Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Thanks guys for your answers. I put my answers in text, below.

Cheers,
Krzysztof Zarzycki

2015-09-10 15:39 GMT+02:00 Cody Koeninger :

> The kafka direct stream meets those requirements.  You don't need
> checkpointing for exactly-once.  Indeed, unless your output operations are
> idempotent, you can't get exactly-once if you're relying on checkpointing.
> Instead, you need to store the offsets atomically in the same transaction
> as your results.
>

To focus discussion, let's assume my operations are idempotent & I'm
interested in at-least-once thanks to that (which is idempotent
exactly-once as named in your pres). Did you say, that I don't need
checkpointing for that? How then direct stream API would store offsets
 between restarts?


> See
> https://github.com/koeninger/kafka-exactly-once
> and the video / blog posts linked from it.
>
>
I did that, thank you. What I want is to achieve "idempotent exactly-once"
as named in your presentation.


> The dibhatt consumer that Akhil linked is using zookeeper to store
> offsets, so to the best of my knowledge, it cannot do exactly-once without
> idempotent output operations.
>
True, and I totally accept it if what I get is at-least-once.


>
>
Regarding the issues around code changes and checkpointing, the most
> straightforward way to deal with this is to just start a new version of
> your job before stopping the old one.  If you care about delivery semantics
> and are using checkpointing, your output operation must be idempotent
> anyway, so having 2 versions of the code running at the same time for a
> brief period should not be a problem.
>

How starting new version before stopping old one helps? Could you please
explain a bit the mechanics of that?
Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
situations when it will be just inapropriate to run old one, when, let's
say, we discovered a bug and don't want to run it anymore.


So... To sum up it correctly, if I want at-least-once, with simple code
upgrades,  I need to:
-  store offsets in external storage (I would choose ZK for that).
-  read them on application restart and pass the TopicAndPartition->offset
map to createDirectStream.
-  And I don't need to use checkpoints at all then.
Could you confirm that?

It's a question where should I actually commit the ZK offsets. The easiest
would be to do it on the end of every batch. Do you think I can use
org.apache.spark.streaming.scheduler.StreamingListener, method
onBatchCompleted for that? I don't think so, because probably we don't have
access to finieshed offsets in it...
So maybe each executor can commit the offsets?

Alternative to that solution I just realized is to stay with old Kafka
receiver (createStream API) and just enable Write Ahead Logs. This way, we
don't lose any data on application kill, so have "idempotent exactly-once"
semantics, offsets are stored in ZK for us, don't need to use
checkpoints... Seems like viable option! Do you agree?






>
>
>
> On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> >> checkpoints can't be used between controlled restarts
>>
>> Is that true? If so, why? From my testing, checkpoints appear to be
>> working fine, we get the data we've missed between the time the consumer
>> went down and the time we brought it back up.
>>
> I'm sorry I simplified the case. I meant "checkpoints can't be used
between controlled restarts if you want to upgrade code in between".

>
>> >> If I cannot make checkpoints between code upgrades, does it mean that
>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>> that I have to implement my own storing to/initalization of offsets from
>> Zookeeper?
>>
>> By code upgrades, are code changes to the consumer program meant?
>>
> Exactly.

>
>> If that is the case, one idea we've been entertaining is that, if the
>> consumer changes, especially if its configuration parameters change, it
>> means that some older configuration may still be stuck in the
>> checkpointing.  What we'd do in this case is, prior to starting the
>> consumer, blow away the checkpointing directory and re-consume from Kafka
>> from the smallest offsets.  In our case, it's OK to re-process; I realize
>> that in many cases that may not be an option.  If that's the case then it
>> would seem to follow that you have to manage offsets in Zk...
>>
>> Another thing to consider would be to treat upgrades operationally. In
>> that, if an upgrade is to happen, consume the data up to a certain point
>> then bring the system down for an upgrade. Remove checkpointing. Restart
>> everything; the system would now be rebuilding the checkpointing and using
>> your upgraded consumers.  (Again, this may not be possible in some systems
>> where the data influx is constant and/or the data is mission critical)...
>>
> Thanks for your idea, but it is indeed impossible in my case to run kafka
topic from beginning. We keep 

Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dibyendu Bhattacharya
Hi,

Just to clarify one point which may not be clear to many. If someone
 decides to use Receiver based approach , the best options at this point is
to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
work with WAL like any other receiver based consumer. The major issue with
KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
Low Level Kafka Consumer API which does not have any such issue.  I am not
sure if there is any publicly available performance benchmark done with
this one with the DirectStream, so can not comment on performance benefits
of one over other , but whatever performance benchmark we have done,
dibbhatt/kafka-spark-consumer  stands out..

Regards,
Dibyendu

On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger  wrote:

> You have to store offsets somewhere.
>
> If you're going to store them in checkpoints, then you have to deal with
> the fact that checkpoints aren't recoverable on code change.  Starting up
> the new version helps because you don't start it from the same checkpoint
> directory as the running one... it has your new code, and is storing to a
> new checkpoint directory.  If you started the new one from the latest
> offsets, you can shut down the old one as soon as it's caught up.
>
> If you don't like the implications of storing offsets in checkpoints...
> then sure, store them yourself.  A real database would be better, but if
> you really want to store them in zookeeper you can.  In any case, just do
> your offset saves in the same foreachPartition your other output operations
> are occurring in, after they've successfully completed.
>
> If you don't care about performance benefits of the direct stream and
> don't want exactly once semantics, sure use the old stream.
>
> Finally, hundreds of gigs just really isn't very much data.  Unless what
> you're doing is really resource intensive, it shouldn't take much time to
> process it all, especially if you can dynamically size a cluster for the
> rare occasion that something is screwed up and you need to reprocess.
>
>
> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki 
> wrote:
>
>> Thanks guys for your answers. I put my answers in text, below.
>>
>> Cheers,
>> Krzysztof Zarzycki
>>
>> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>>
>>> The kafka direct stream meets those requirements.  You don't need
>>> checkpointing for exactly-once.  Indeed, unless your output operations are
>>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>>> Instead, you need to store the offsets atomically in the same transaction
>>> as your results.
>>>
>>
>> To focus discussion, let's assume my operations are idempotent & I'm
>> interested in at-least-once thanks to that (which is idempotent
>> exactly-once as named in your pres). Did you say, that I don't need
>> checkpointing for that? How then direct stream API would store offsets
>>  between restarts?
>>
>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>> and the video / blog posts linked from it.
>>>
>>>
>> I did that, thank you. What I want is to achieve "idempotent
>> exactly-once" as named in your presentation.
>>
>>
>>> The dibhatt consumer that Akhil linked is using zookeeper to store
>>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>>> idempotent output operations.
>>>
>> True, and I totally accept it if what I get is at-least-once.
>>
>>
>>>
>>>
>> Regarding the issues around code changes and checkpointing, the most
>>> straightforward way to deal with this is to just start a new version of
>>> your job before stopping the old one.  If you care about delivery semantics
>>> and are using checkpointing, your output operation must be idempotent
>>> anyway, so having 2 versions of the code running at the same time for a
>>> brief period should not be a problem.
>>>
>>
>> How starting new version before stopping old one helps? Could you please
>> explain a bit the mechanics of that?
>> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
>> situations when it will be just inapropriate to run old one, when, let's
>> say, we discovered a bug and don't want to run it anymore.
>>
>>
>> So... To sum up it correctly, if I want at-least-once, with simple code
>> upgrades,  I need to:
>> -  store offsets in external storage (I would choose ZK for that).
>> -  read them on application restart and pass the
>> TopicAndPartition->offset map to createDirectStream.
>> -  And I don't need to use checkpoints at all then.
>> Could you confirm that?
>>
>> It's a question where should I actually commit the ZK offsets. The
>> easiest would be to do it on the end of every batch. Do you think I can use
>> org.apache.spark.streaming.scheduler.StreamingListener, method
>> onBatchCompleted for that? I don't think so, because probably we don't have
>> 

Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Krzysztof Zarzycki
Thanks Cody for your answers. This discussion helped me a lot.
Though, I still feel that offsets management could be better handled by
Spark, if it really wants to be easy streaming framework. If it won't help
users to do so, I'm affraid it will be superseded for many by other
frameworks that might do it more convenient (Samza, Flink or just
being-designed Kafka-Streams

)

Thanks Dibyendu for your note, I will strongly consider it, when falling
back to receiver-based approach.

Cheers,
Krzysztof Zarzycki

2015-09-10 16:52 GMT+02:00 Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com>:

> Hi,
>
> Just to clarify one point which may not be clear to many. If someone
>  decides to use Receiver based approach , the best options at this point is
> to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
> work with WAL like any other receiver based consumer. The major issue with
> KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
> issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
> Low Level Kafka Consumer API which does not have any such issue.  I am not
> sure if there is any publicly available performance benchmark done with
> this one with the DirectStream, so can not comment on performance benefits
> of one over other , but whatever performance benchmark we have done,
> dibbhatt/kafka-spark-consumer  stands out..
>
> Regards,
> Dibyendu
>
> On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger 
> wrote:
>
>> You have to store offsets somewhere.
>>
>> If you're going to store them in checkpoints, then you have to deal with
>> the fact that checkpoints aren't recoverable on code change.  Starting up
>> the new version helps because you don't start it from the same checkpoint
>> directory as the running one... it has your new code, and is storing to a
>> new checkpoint directory.  If you started the new one from the latest
>> offsets, you can shut down the old one as soon as it's caught up.
>>
>> If you don't like the implications of storing offsets in checkpoints...
>> then sure, store them yourself.  A real database would be better, but if
>> you really want to store them in zookeeper you can.  In any case, just do
>> your offset saves in the same foreachPartition your other output operations
>> are occurring in, after they've successfully completed.
>>
>> If you don't care about performance benefits of the direct stream and
>> don't want exactly once semantics, sure use the old stream.
>>
>> Finally, hundreds of gigs just really isn't very much data.  Unless what
>> you're doing is really resource intensive, it shouldn't take much time to
>> process it all, especially if you can dynamically size a cluster for the
>> rare occasion that something is screwed up and you need to reprocess.
>>
>>
>> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki > > wrote:
>>
>>> Thanks guys for your answers. I put my answers in text, below.
>>>
>>> Cheers,
>>> Krzysztof Zarzycki
>>>
>>> 2015-09-10 15:39 GMT+02:00 Cody Koeninger :
>>>
 The kafka direct stream meets those requirements.  You don't need
 checkpointing for exactly-once.  Indeed, unless your output operations are
 idempotent, you can't get exactly-once if you're relying on checkpointing.
 Instead, you need to store the offsets atomically in the same transaction
 as your results.

>>>
>>> To focus discussion, let's assume my operations are idempotent & I'm
>>> interested in at-least-once thanks to that (which is idempotent
>>> exactly-once as named in your pres). Did you say, that I don't need
>>> checkpointing for that? How then direct stream API would store offsets
>>>  between restarts?
>>>
>>>
 See
 https://github.com/koeninger/kafka-exactly-once
 and the video / blog posts linked from it.


>>> I did that, thank you. What I want is to achieve "idempotent
>>> exactly-once" as named in your presentation.
>>>
>>>
 The dibhatt consumer that Akhil linked is using zookeeper to store
 offsets, so to the best of my knowledge, it cannot do exactly-once without
 idempotent output operations.

>>> True, and I totally accept it if what I get is at-least-once.
>>>
>>>


>>> Regarding the issues around code changes and checkpointing, the most
 straightforward way to deal with this is to just start a new version of
 your job before stopping the old one.  If you care about delivery semantics
 and are using checkpointing, your output operation must be idempotent
 anyway, so having 2 versions of the code running at the same time for a
 brief period should not be a problem.

>>>
>>> How starting new version before stopping old one helps? Could you please
>>> explain a bit the mechanics of that?
>>> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
>>> 

Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Cody Koeninger
There is no free lunch.  (TD, when do we get shirts made that say that?)

If you want exactly-once delivery semantics for arbitrary workflows into
arbitrary datastores, you're going to have to do some of your own work.

If someone is telling you otherwise, they're probably lying to you.

I think writing your own couple of lines of code to "select offsets from"
 and "update offsets where", for whatever your data store may be, is worth
the flexibility of doing exactly what you need.


On Thu, Sep 10, 2015 at 10:01 AM, Krzysztof Zarzycki 
wrote:

> Thanks Cody for your answers. This discussion helped me a lot.
> Though, I still feel that offsets management could be better handled by
> Spark, if it really wants to be easy streaming framework. If it won't help
> users to do so, I'm affraid it will be superseded for many by other
> frameworks that might do it more convenient (Samza, Flink or just
> being-designed Kafka-Streams
> 
> )
>
> Thanks Dibyendu for your note, I will strongly consider it, when falling
> back to receiver-based approach.
>
> Cheers,
> Krzysztof Zarzycki
>
> 2015-09-10 16:52 GMT+02:00 Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com>:
>
>> Hi,
>>
>> Just to clarify one point which may not be clear to many. If someone
>>  decides to use Receiver based approach , the best options at this point is
>> to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
>> work with WAL like any other receiver based consumer. The major issue with
>> KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
>> issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
>> Low Level Kafka Consumer API which does not have any such issue.  I am not
>> sure if there is any publicly available performance benchmark done with
>> this one with the DirectStream, so can not comment on performance benefits
>> of one over other , but whatever performance benchmark we have done,
>> dibbhatt/kafka-spark-consumer  stands out..
>>
>> Regards,
>> Dibyendu
>>
>> On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger 
>> wrote:
>>
>>> You have to store offsets somewhere.
>>>
>>> If you're going to store them in checkpoints, then you have to deal with
>>> the fact that checkpoints aren't recoverable on code change.  Starting up
>>> the new version helps because you don't start it from the same checkpoint
>>> directory as the running one... it has your new code, and is storing to a
>>> new checkpoint directory.  If you started the new one from the latest
>>> offsets, you can shut down the old one as soon as it's caught up.
>>>
>>> If you don't like the implications of storing offsets in checkpoints...
>>> then sure, store them yourself.  A real database would be better, but if
>>> you really want to store them in zookeeper you can.  In any case, just do
>>> your offset saves in the same foreachPartition your other output operations
>>> are occurring in, after they've successfully completed.
>>>
>>> If you don't care about performance benefits of the direct stream and
>>> don't want exactly once semantics, sure use the old stream.
>>>
>>> Finally, hundreds of gigs just really isn't very much data.  Unless what
>>> you're doing is really resource intensive, it shouldn't take much time to
>>> process it all, especially if you can dynamically size a cluster for the
>>> rare occasion that something is screwed up and you need to reprocess.
>>>
>>>
>>> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki <
>>> k.zarzy...@gmail.com> wrote:
>>>
 Thanks guys for your answers. I put my answers in text, below.

 Cheers,
 Krzysztof Zarzycki

 2015-09-10 15:39 GMT+02:00 Cody Koeninger :

> The kafka direct stream meets those requirements.  You don't need
> checkpointing for exactly-once.  Indeed, unless your output operations are
> idempotent, you can't get exactly-once if you're relying on checkpointing.
> Instead, you need to store the offsets atomically in the same transaction
> as your results.
>

 To focus discussion, let's assume my operations are idempotent & I'm
 interested in at-least-once thanks to that (which is idempotent
 exactly-once as named in your pres). Did you say, that I don't need
 checkpointing for that? How then direct stream API would store offsets
  between restarts?


> See
> https://github.com/koeninger/kafka-exactly-once
> and the video / blog posts linked from it.
>
>
 I did that, thank you. What I want is to achieve "idempotent
 exactly-once" as named in your presentation.


> The dibhatt consumer that Akhil linked is using zookeeper to store
> offsets, so to the best of my knowledge, it cannot do exactly-once without
> idempotent output operations.
>
 True, and I totally accept it if