Re: Complications with saving Kafka offsets?

2016-07-15 Thread Cody Koeninger
The bottom line short answer for this is that if you actually care
about data integrity, you need to store your offsets transactionally
alongside your results in the same data store.

If you're ok with double-counting in the event of failures, saving
offsets _after_ saving your results, using foreachRDD, will work.

Have you read the material / examples linked from
https://github.com/koeninger/kafka-exactly-once ?


On Mon, Jul 11, 2016 at 9:58 PM, BradleyUM <brad...@unionmetrics.com> wrote:
> I'm working on a Spark Streaming (1.6.0) project and one of our requirements
> is to persist Kafka offsets to Zookeeper after a batch has completed so that
> we can restart work from the correct position if we have to restart the
> process for any reason. Many links,
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
> included, seem to suggest that calling transform() on the stream is a
> perfectly acceptable way to store the offsets off for processing when the
> batch completes. Since that method seems to offer more intuitive ordering
> guarantees than foreachRDD() we have, up until now, preferred it. So our
> code looks something like the following:
>
> AtomicReference<OffsetRange[]> savedOffsets = new AtomicReference<>();
>
> messages = messages.transformToPair((rdd) -> {
>   // Save the offsets so that we can update ZK with them later
>   HasOffsetRanges hasOffsetRanges = (HasOffsetRanges)rdd.rdd();
>   savedOffsets.set(hasOffsetRanges.offsetRanges());
> }
>
> Unfortunately we've discovered that this doesn't work, as contrary to
> expectations the logic inside of transformToPair() seems to run whenever a
> new batch gets added, even if we're not prepared to process it yet. So
> savedOffsets will store the offsets of the most recently enqueued batch, not
> necessarily the one being processed. When a batch completes, then, the
> offset we save to ZK may reflect enqueued data that we haven't actually
> processed yet. This can (and has) created conditions where a crash causes us
> to restart from the wrong position and drop data.
>
> There seem to be two solutions to this, from what I can tell:
>
> 1.) A brief test using foreachRDD() instead of transform() seems to behave
> more in line with expectations, with the call only being made when a batch
> actually begins to process. I have yet to find an explanation as to why the
> two methods differ in this way.
> 2.) Instead of using an AtomicReference we tried a queue of offsets. Our
> logic pushes a set of offsets at the start of a batch and pulls off the
> oldest at the end - the idea is that the one being pulled will always
> reflect the most recently processed, not one from the queue. Since we're not
> 100% on whether Spark guarantees this we also have logic to assert that the
> batch that was completed has the same RDD ID as the one we're pulling from
> the queue.
>
> However, I have yet to find anything, on this list or elsewhere, that
> suggests that either of these two approaches is necessary. Does what I've
> described match anyone else's experience? Is the behavior I'm seeing from
> the transform() method expected? Do both of the solutions I've proposed seem
> legitimate, or is there some complication that I've failed to account for?
>
> Any help is appreciated.
>
> - Bradley
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Complications-with-saving-Kafka-offsets-tp27324.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Complications with saving Kafka offsets?

2016-07-11 Thread BradleyUM
I'm working on a Spark Streaming (1.6.0) project and one of our requirements
is to persist Kafka offsets to Zookeeper after a batch has completed so that
we can restart work from the correct position if we have to restart the
process for any reason. Many links,
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
included, seem to suggest that calling transform() on the stream is a
perfectly acceptable way to store the offsets off for processing when the
batch completes. Since that method seems to offer more intuitive ordering
guarantees than foreachRDD() we have, up until now, preferred it. So our
code looks something like the following:

AtomicReference<OffsetRange[]> savedOffsets = new AtomicReference<>();

messages = messages.transformToPair((rdd) -> {
  // Save the offsets so that we can update ZK with them later
  HasOffsetRanges hasOffsetRanges = (HasOffsetRanges)rdd.rdd();
  savedOffsets.set(hasOffsetRanges.offsetRanges());
}

Unfortunately we've discovered that this doesn't work, as contrary to
expectations the logic inside of transformToPair() seems to run whenever a
new batch gets added, even if we're not prepared to process it yet. So
savedOffsets will store the offsets of the most recently enqueued batch, not
necessarily the one being processed. When a batch completes, then, the
offset we save to ZK may reflect enqueued data that we haven't actually
processed yet. This can (and has) created conditions where a crash causes us
to restart from the wrong position and drop data.

There seem to be two solutions to this, from what I can tell:

1.) A brief test using foreachRDD() instead of transform() seems to behave
more in line with expectations, with the call only being made when a batch
actually begins to process. I have yet to find an explanation as to why the
two methods differ in this way.
2.) Instead of using an AtomicReference we tried a queue of offsets. Our
logic pushes a set of offsets at the start of a batch and pulls off the
oldest at the end - the idea is that the one being pulled will always
reflect the most recently processed, not one from the queue. Since we're not
100% on whether Spark guarantees this we also have logic to assert that the
batch that was completed has the same RDD ID as the one we're pulling from
the queue.

However, I have yet to find anything, on this list or elsewhere, that
suggests that either of these two approaches is necessary. Does what I've
described match anyone else's experience? Is the behavior I'm seeing from
the transform() method expected? Do both of the solutions I've proposed seem
legitimate, or is there some complication that I've failed to account for?

Any help is appreciated.

- Bradley



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Complications-with-saving-Kafka-offsets-tp27324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org