I'm working on a Spark Streaming (1.6.0) project and one of our requirements is to persist Kafka offsets to Zookeeper after a batch has completed so that we can restart work from the correct position if we have to restart the process for any reason. Many links, http://spark.apache.org/docs/latest/streaming-kafka-integration.html included, seem to suggest that calling transform() on the stream is a perfectly acceptable way to store the offsets off for processing when the batch completes. Since that method seems to offer more intuitive ordering guarantees than foreachRDD() we have, up until now, preferred it. So our code looks something like the following:
AtomicReference<OffsetRange[]> savedOffsets = new AtomicReference<>(); messages = messages.transformToPair((rdd) -> { // Save the offsets so that we can update ZK with them later HasOffsetRanges hasOffsetRanges = (HasOffsetRanges)rdd.rdd(); savedOffsets.set(hasOffsetRanges.offsetRanges()); } Unfortunately we've discovered that this doesn't work, as contrary to expectations the logic inside of transformToPair() seems to run whenever a new batch gets added, even if we're not prepared to process it yet. So savedOffsets will store the offsets of the most recently enqueued batch, not necessarily the one being processed. When a batch completes, then, the offset we save to ZK may reflect enqueued data that we haven't actually processed yet. This can (and has) created conditions where a crash causes us to restart from the wrong position and drop data. There seem to be two solutions to this, from what I can tell: 1.) A brief test using foreachRDD() instead of transform() seems to behave more in line with expectations, with the call only being made when a batch actually begins to process. I have yet to find an explanation as to why the two methods differ in this way. 2.) Instead of using an AtomicReference we tried a queue of offsets. Our logic pushes a set of offsets at the start of a batch and pulls off the oldest at the end - the idea is that the one being pulled will always reflect the most recently processed, not one from the queue. Since we're not 100% on whether Spark guarantees this we also have logic to assert that the batch that was completed has the same RDD ID as the one we're pulling from the queue. However, I have yet to find anything, on this list or elsewhere, that suggests that either of these two approaches is necessary. Does what I've described match anyone else's experience? Is the behavior I'm seeing from the transform() method expected? Do both of the solutions I've proposed seem legitimate, or is there some complication that I've failed to account for? Any help is appreciated. - Bradley -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Complications-with-saving-Kafka-offsets-tp27324.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org