Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20572#discussion_r169489088
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
---
@@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
s"Failed to get records for $groupId $topic $partition $offset
after polling for $timeout")
record = buffer.next()
assert(record.offset == offset,
- s"Got wrong record for $groupId $topic $partition even after
seeking to offset $offset")
+ s"Got wrong record for $groupId $topic $partition even after
seeking to offset $offset " +
+ s"got offset ${record.offset} instead. If this is a compacted
topic, consider enabling " +
+ "spark.streaming.kafka.allowNonConsecutiveOffsets"
+ )
}
nextOffset = offset + 1
record
}
+ /**
+ * Start a batch on a compacted topic
+ */
+ def compactedStart(offset: Long, timeout: Long): Unit = {
+ logDebug(s"compacted start $groupId $topic $partition starting
$offset")
+ // This seek may not be necessary, but it's hard to tell due to gaps
in compacted topics
+ if (offset != nextOffset) {
+ logInfo(s"Initial fetch for compacted $groupId $topic $partition
$offset")
+ seek(offset)
+ poll(timeout)
+ }
+ }
+
+ /**
+ * Get the next record in the batch from a compacted topic.
+ * Assumes compactedStart has been called first, and ignores gaps.
+ */
+ def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+ if (!buffer.hasNext()) { poll(timeout) }
+ assert(buffer.hasNext(),
+ s"Failed to get records for compacted $groupId $topic $partition
after polling for $timeout")
+ val record = buffer.next()
+ nextOffset = record.offset + 1
+ record
+ }
+
+ /**
+ * Rewind to previous record in the batch from a compacted topic.
+ * Will throw NoSuchElementException if no previous element
--- End diff --
Could be a `@throws` tag but no big deal.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]