Hi,

Just to clarify one point which may not be clear to many. If someone
 decides to use Receiver based approach , the best options at this point is
to use  https://github.com/dibbhatt/kafka-spark-consumer. This will also
work with WAL like any other receiver based consumer. The major issue with
KafkaUtils.CreateStream is,  it use Kafka High Level API which has serious
issue with Consumer Re-balance where as dibbhatt/kafka-spark-consumer use
Low Level Kafka Consumer API which does not have any such issue.  I am not
sure if there is any publicly available performance benchmark done with
this one with the DirectStream, so can not comment on performance benefits
of one over other , but whatever performance benchmark we have done,
dibbhatt/kafka-spark-consumer  stands out..

Regards,
Dibyendu

On Thu, Sep 10, 2015 at 8:08 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You have to store offsets somewhere.
>
> If you're going to store them in checkpoints, then you have to deal with
> the fact that checkpoints aren't recoverable on code change.  Starting up
> the new version helps because you don't start it from the same checkpoint
> directory as the running one... it has your new code, and is storing to a
> new checkpoint directory.  If you started the new one from the latest
> offsets, you can shut down the old one as soon as it's caught up.
>
> If you don't like the implications of storing offsets in checkpoints...
> then sure, store them yourself.  A real database would be better, but if
> you really want to store them in zookeeper you can.  In any case, just do
> your offset saves in the same foreachPartition your other output operations
> are occurring in, after they've successfully completed.
>
> If you don't care about performance benefits of the direct stream and
> don't want exactly once semantics, sure use the old stream.
>
> Finally, hundreds of gigs just really isn't very much data.  Unless what
> you're doing is really resource intensive, it shouldn't take much time to
> process it all, especially if you can dynamically size a cluster for the
> rare occasion that something is screwed up and you need to reprocess.
>
>
> On Thu, Sep 10, 2015 at 9:17 AM, Krzysztof Zarzycki <k.zarzy...@gmail.com>
> wrote:
>
>> Thanks guys for your answers. I put my answers in text, below.
>>
>> Cheers,
>> Krzysztof Zarzycki
>>
>> 2015-09-10 15:39 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>>> The kafka direct stream meets those requirements.  You don't need
>>> checkpointing for exactly-once.  Indeed, unless your output operations are
>>> idempotent, you can't get exactly-once if you're relying on checkpointing.
>>> Instead, you need to store the offsets atomically in the same transaction
>>> as your results.
>>>
>>
>> To focus discussion, let's assume my operations are idempotent & I'm
>> interested in at-least-once thanks to that (which is idempotent
>> exactly-once as named in your pres). Did you say, that I don't need
>> checkpointing for that? How then direct stream API would store offsets
>>  between restarts?
>>
>>
>>> See
>>> https://github.com/koeninger/kafka-exactly-once
>>> and the video / blog posts linked from it.
>>>
>>>
>> I did that, thank you. What I want is to achieve "idempotent
>> exactly-once" as named in your presentation.
>>
>>
>>> The dibhatt consumer that Akhil linked is using zookeeper to store
>>> offsets, so to the best of my knowledge, it cannot do exactly-once without
>>> idempotent output operations.
>>>
>> True, and I totally accept it if what I get is at-least-once.
>>
>>
>>>
>>>
>> Regarding the issues around code changes and checkpointing, the most
>>> straightforward way to deal with this is to just start a new version of
>>> your job before stopping the old one.  If you care about delivery semantics
>>> and are using checkpointing, your output operation must be idempotent
>>> anyway, so having 2 versions of the code running at the same time for a
>>> brief period should not be a problem.
>>>
>>
>> How starting new version before stopping old one helps? Could you please
>> explain a bit the mechanics of that?
>> Anyway, it seems definitely cumbersome. Plus, I can imagine plenty of
>> situations when it will be just inapropriate to run old one, when, let's
>> say, we discovered a bug and don't want to run it anymore.
>>
>>
>> So... To sum up it correctly, if I want at-least-once, with simple code
>> upgrades,  I need to:
>> -  store offsets in external storage (I would choose ZK for that).
>> -  read them on application restart and pass the
>> TopicAndPartition->offset map to createDirectStream.
>> -  And I don't need to use checkpoints at all then.
>> Could you confirm that?
>>
>> It's a question where should I actually commit the ZK offsets. The
>> easiest would be to do it on the end of every batch. Do you think I can use
>> org.apache.spark.streaming.scheduler.StreamingListener, method
>> onBatchCompleted for that? I don't think so, because probably we don't have
>> access to finieshed offsets in it...
>> So maybe each executor can commit the offsets?
>>
>> Alternative to that solution I just realized is to stay with old Kafka
>> receiver (createStream API) and just enable Write Ahead Logs. This way, we
>> don't lose any data on application kill, so have "idempotent exactly-once"
>> semantics, offsets are stored in ZK for us, don't need to use
>> checkpoints... Seems like viable option! Do you agree?
>>
>>
>>
>>
>>
>>
>>>
>>>
>>>
>>> On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> >> checkpoints can't be used between controlled restarts
>>>>
>>>> Is that true? If so, why? From my testing, checkpoints appear to be
>>>> working fine, we get the data we've missed between the time the consumer
>>>> went down and the time we brought it back up.
>>>>
>>> I'm sorry I simplified the case. I meant "checkpoints can't be used
>> between controlled restarts if you want to upgrade code in between".
>>
>>>
>>>> >> If I cannot make checkpoints between code upgrades, does it mean
>>>> that Spark does not help me at all with keeping my Kafka offsets? Does it
>>>> mean, that I have to implement my own storing to/initalization of offsets
>>>> from Zookeeper?
>>>>
>>>> By code upgrades, are code changes to the consumer program meant?
>>>>
>>> Exactly.
>>
>>>
>>>> If that is the case, one idea we've been entertaining is that, if the
>>>> consumer changes, especially if its configuration parameters change, it
>>>> means that some older configuration may still be stuck in the
>>>> checkpointing.  What we'd do in this case is, prior to starting the
>>>> consumer, blow away the checkpointing directory and re-consume from Kafka
>>>> from the smallest offsets.  In our case, it's OK to re-process; I realize
>>>> that in many cases that may not be an option.  If that's the case then it
>>>> would seem to follow that you have to manage offsets in Zk...
>>>>
>>>> Another thing to consider would be to treat upgrades operationally. In
>>>> that, if an upgrade is to happen, consume the data up to a certain point
>>>> then bring the system down for an upgrade. Remove checkpointing. Restart
>>>> everything; the system would now be rebuilding the checkpointing and using
>>>> your upgraded consumers.  (Again, this may not be possible in some systems
>>>> where the data influx is constant and/or the data is mission critical)...
>>>>
>>> Thanks for your idea, but it is indeed impossible in my case to run
>> kafka topic from beginning. We keep months, even hundreds of gigs of data
>> in the topic, so I definitely don't want to process it all.
>>
>>
>>>
>>>> Perhaps this discussion implies that there may be a new feature in
>>>> Spark where it intelligently drops the checkpointing or allows you to
>>>> selectively pluck out and drop some items prior to restarting...
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>>> wrote:
>>>>
>>>>> This consumer pretty much covers all those scenarios you listed
>>>>> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>>>>>
>>>>> Thanks
>>>>> Best Regards
>>>>>
>>>>> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki <
>>>>> k.zarzy...@gmail.com> wrote:
>>>>>
>>>>>> Hi there,
>>>>>> I have a problem with fulfilling all my needs when using Spark
>>>>>> Streaming on Kafka. Let me enumerate my requirements:
>>>>>> 1. I want to have at-least-once/exactly-once processing.
>>>>>> 2. I want to have my application fault & simple stop tolerant. The
>>>>>> Kafka offsets need to be tracked between restarts.
>>>>>> 3. I want to be able to upgrade code of my application without losing
>>>>>> Kafka offsets.
>>>>>>
>>>>>> Now what my requirements imply according to my knowledge:
>>>>>> 1. implies using new Kafka DirectStream.
>>>>>> 2. implies  using checkpointing. kafka DirectStream will write
>>>>>> offsets to the checkpoint as well.
>>>>>> 3. implies that checkpoints can't be used between controlled
>>>>>> restarts. So I need to install shutdownHook with
>>>>>> ssc.stop(stopGracefully=true) (here is a description how:
>>>>>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>>>>>> )
>>>>>>
>>>>>> Now my problems are:
>>>>>> 1. If I cannot make checkpoints between code upgrades, does it mean
>>>>>> that Spark does not help me at all with keeping my Kafka offsets? Does it
>>>>>> mean, that I have to implement my own storing to/initalization of offsets
>>>>>> from Zookeeper?
>>>>>> 2. When I set up shutdownHook and my any executor throws an
>>>>>> exception, it seems that application does not fail, but stuck in running
>>>>>> state. Is that because stopGracefully deadlocks on exceptions? How to
>>>>>> overcome this problem? Maybe I can avoid setting shutdownHook and there 
>>>>>> is
>>>>>> other way to stop gracefully your app?
>>>>>>
>>>>>> 3. If I somehow overcome 2., is it enough to just stop gracefully my
>>>>>> app to be able to upgrade code & not lose Kafka offsets?
>>>>>>
>>>>>>
>>>>>> Thank you a lot for your answers,
>>>>>> Krzysztof Zarzycki
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to