I'm working on a Spark Streaming (1.6.0) project and one of our requirements
is to persist Kafka offsets to Zookeeper after a batch has completed so that
we can restart work from the correct position if we have to restart the
process for any reason. Many links,
http://spark.apache.org/docs/latest/streaming-kafka-integration.html
included, seem to suggest that calling transform() on the stream is a
perfectly acceptable way to store the offsets off for processing when the
batch completes. Since that method seems to offer more intuitive ordering
guarantees than foreachRDD() we have, up until now, preferred it. So our
code looks something like the following:

AtomicReference<OffsetRange[]> savedOffsets = new AtomicReference<>();

messages = messages.transformToPair((rdd) -> {
  // Save the offsets so that we can update ZK with them later
  HasOffsetRanges hasOffsetRanges = (HasOffsetRanges)rdd.rdd();
  savedOffsets.set(hasOffsetRanges.offsetRanges());
}

Unfortunately we've discovered that this doesn't work, as contrary to
expectations the logic inside of transformToPair() seems to run whenever a
new batch gets added, even if we're not prepared to process it yet. So
savedOffsets will store the offsets of the most recently enqueued batch, not
necessarily the one being processed. When a batch completes, then, the
offset we save to ZK may reflect enqueued data that we haven't actually
processed yet. This can (and has) created conditions where a crash causes us
to restart from the wrong position and drop data.

There seem to be two solutions to this, from what I can tell:

1.) A brief test using foreachRDD() instead of transform() seems to behave
more in line with expectations, with the call only being made when a batch
actually begins to process. I have yet to find an explanation as to why the
two methods differ in this way.
2.) Instead of using an AtomicReference we tried a queue of offsets. Our
logic pushes a set of offsets at the start of a batch and pulls off the
oldest at the end - the idea is that the one being pulled will always
reflect the most recently processed, not one from the queue. Since we're not
100% on whether Spark guarantees this we also have logic to assert that the
batch that was completed has the same RDD ID as the one we're pulling from
the queue.

However, I have yet to find anything, on this list or elsewhere, that
suggests that either of these two approaches is necessary. Does what I've
described match anyone else's experience? Is the behavior I'm seeing from
the transform() method expected? Do both of the solutions I've proposed seem
legitimate, or is there some complication that I've failed to account for?

Any help is appreciated.

- Bradley



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Complications-with-saving-Kafka-offsets-tp27324.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to