The kafka direct stream meets those requirements. You don't need checkpointing for exactly-once. Indeed, unless your output operations are idempotent, you can't get exactly-once if you're relying on checkpointing. Instead, you need to store the offsets atomically in the same transaction as your results.
See https://github.com/koeninger/kafka-exactly-once and the video / blog posts linked from it. The dibhatt consumer that Akhil linked is using zookeeper to store offsets, so to the best of my knowledge, it cannot do exactly-once without idempotent output operations. Regarding the issues around code changes and checkpointing, the most straightforward way to deal with this is to just start a new version of your job before stopping the old one. If you care about delivery semantics and are using checkpointing, your output operation must be idempotent anyway, so having 2 versions of the code running at the same time for a brief period should not be a problem. On Thu, Sep 10, 2015 at 8:02 AM, Dmitry Goldenberg <dgoldenberg...@gmail.com > wrote: > >> checkpoints can't be used between controlled restarts > > Is that true? If so, why? From my testing, checkpoints appear to be > working fine, we get the data we've missed between the time the consumer > went down and the time we brought it back up. > > >> If I cannot make checkpoints between code upgrades, does it mean that > Spark does not help me at all with keeping my Kafka offsets? Does it mean, > that I have to implement my own storing to/initalization of offsets from > Zookeeper? > > By code upgrades, are code changes to the consumer program meant? > > If that is the case, one idea we've been entertaining is that, if the > consumer changes, especially if its configuration parameters change, it > means that some older configuration may still be stuck in the > checkpointing. What we'd do in this case is, prior to starting the > consumer, blow away the checkpointing directory and re-consume from Kafka > from the smallest offsets. In our case, it's OK to re-process; I realize > that in many cases that may not be an option. If that's the case then it > would seem to follow that you have to manage offsets in Zk... > > Another thing to consider would be to treat upgrades operationally. In > that, if an upgrade is to happen, consume the data up to a certain point > then bring the system down for an upgrade. Remove checkpointing. Restart > everything; the system would now be rebuilding the checkpointing and using > your upgraded consumers. (Again, this may not be possible in some systems > where the data influx is constant and/or the data is mission critical)... > > Perhaps this discussion implies that there may be a new feature in Spark > where it intelligently drops the checkpointing or allows you to selectively > pluck out and drop some items prior to restarting... > > > > > On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> This consumer pretty much covers all those scenarios you listed >> github.com/dibbhatt/kafka-spark-consumer Give it a try. >> >> Thanks >> Best Regards >> >> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki <k.zarzy...@gmail.com >> > wrote: >> >>> Hi there, >>> I have a problem with fulfilling all my needs when using Spark Streaming >>> on Kafka. Let me enumerate my requirements: >>> 1. I want to have at-least-once/exactly-once processing. >>> 2. I want to have my application fault & simple stop tolerant. The Kafka >>> offsets need to be tracked between restarts. >>> 3. I want to be able to upgrade code of my application without losing >>> Kafka offsets. >>> >>> Now what my requirements imply according to my knowledge: >>> 1. implies using new Kafka DirectStream. >>> 2. implies using checkpointing. kafka DirectStream will write offsets >>> to the checkpoint as well. >>> 3. implies that checkpoints can't be used between controlled restarts. >>> So I need to install shutdownHook with ssc.stop(stopGracefully=true) (here >>> is a description how: >>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully >>> ) >>> >>> Now my problems are: >>> 1. If I cannot make checkpoints between code upgrades, does it mean that >>> Spark does not help me at all with keeping my Kafka offsets? Does it mean, >>> that I have to implement my own storing to/initalization of offsets from >>> Zookeeper? >>> 2. When I set up shutdownHook and my any executor throws an exception, >>> it seems that application does not fail, but stuck in running state. Is >>> that because stopGracefully deadlocks on exceptions? How to overcome this >>> problem? Maybe I can avoid setting shutdownHook and there is other way to >>> stop gracefully your app? >>> >>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app >>> to be able to upgrade code & not lose Kafka offsets? >>> >>> >>> Thank you a lot for your answers, >>> Krzysztof Zarzycki >>> >>> >>> >>> >> >