The bottom line short answer for this is that if you actually care
about data integrity, you need to store your offsets transactionally
alongside your results in the same data store.
If you're ok with double-counting in the event of failures, saving
offsets _after_ saving your results, using foreachRDD, will work.
Have you read the material / examples linked from
https://github.com/koeninger/kafka-exactly-once ?
On Mon, Jul 11, 2016 at 9:58 PM, BradleyUM <brad...@unionmetrics.com> wrote:
> I'm working on a Spark Streaming (1.6.0) project and one of our requirements
> is to persist Kafka offsets to Zookeeper after a batch has completed so that
> we can restart work from the correct position if we have to restart the
> process for any reason. Many links,
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
> included, seem to suggest that calling transform() on the stream is a
> perfectly acceptable way to store the offsets off for processing when the
> batch completes. Since that method seems to offer more intuitive ordering
> guarantees than foreachRDD() we have, up until now, preferred it. So our
> code looks something like the following:
>
> AtomicReference<OffsetRange[]> savedOffsets = new AtomicReference<>();
>
> messages = messages.transformToPair((rdd) -> {
> // Save the offsets so that we can update ZK with them later
> HasOffsetRanges hasOffsetRanges = (HasOffsetRanges)rdd.rdd();
> savedOffsets.set(hasOffsetRanges.offsetRanges());
> }
>
> Unfortunately we've discovered that this doesn't work, as contrary to
> expectations the logic inside of transformToPair() seems to run whenever a
> new batch gets added, even if we're not prepared to process it yet. So
> savedOffsets will store the offsets of the most recently enqueued batch, not
> necessarily the one being processed. When a batch completes, then, the
> offset we save to ZK may reflect enqueued data that we haven't actually
> processed yet. This can (and has) created conditions where a crash causes us
> to restart from the wrong position and drop data.
>
> There seem to be two solutions to this, from what I can tell:
>
> 1.) A brief test using foreachRDD() instead of transform() seems to behave
> more in line with expectations, with the call only being made when a batch
> actually begins to process. I have yet to find an explanation as to why the
> two methods differ in this way.
> 2.) Instead of using an AtomicReference we tried a queue of offsets. Our
> logic pushes a set of offsets at the start of a batch and pulls off the
> oldest at the end - the idea is that the one being pulled will always
> reflect the most recently processed, not one from the queue. Since we're not
> 100% on whether Spark guarantees this we also have logic to assert that the
> batch that was completed has the same RDD ID as the one we're pulling from
> the queue.
>
> However, I have yet to find anything, on this list or elsewhere, that
> suggests that either of these two approaches is necessary. Does what I've
> described match anyone else's experience? Is the behavior I'm seeing from
> the transform() method expected? Do both of the solutions I've proposed seem
> legitimate, or is there some complication that I've failed to account for?
>
> Any help is appreciated.
>
> - Bradley
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Complications-with-saving-Kafka-offsets-tp27324.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org