Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20572#discussion_r170278931
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
---
@@ -71,25 +69,62 @@ class CachedKafkaConsumer[K, V] private(
}
if (!buffer.hasNext()) { poll(timeout) }
- assert(buffer.hasNext(),
+ require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset after
polling for $timeout")
var record = buffer.next()
if (record.offset != offset) {
logInfo(s"Buffer miss for $groupId $topic $partition $offset")
seek(offset)
poll(timeout)
- assert(buffer.hasNext(),
+ require(buffer.hasNext(),
s"Failed to get records for $groupId $topic $partition $offset
after polling for $timeout")
record = buffer.next()
- assert(record.offset == offset,
- s"Got wrong record for $groupId $topic $partition even after
seeking to offset $offset")
+ require(record.offset == offset,
+ s"Got wrong record for $groupId $topic $partition even after
seeking to offset $offset " +
+ s"got offset ${record.offset} instead. If this is a compacted
topic, consider enabling " +
+ "spark.streaming.kafka.allowNonConsecutiveOffsets"
+ )
}
nextOffset = offset + 1
record
}
+ /**
+ * Start a batch on a compacted topic
+ */
+ def compactedStart(offset: Long, timeout: Long): Unit = {
+ logDebug(s"compacted start $groupId $topic $partition starting
$offset")
+ // This seek may not be necessary, but it's hard to tell due to gaps
in compacted topics
+ if (offset != nextOffset) {
+ logInfo(s"Initial fetch for compacted $groupId $topic $partition
$offset")
+ seek(offset)
+ poll(timeout)
+ }
+ }
+
+ /**
+ * Get the next record in the batch from a compacted topic.
+ * Assumes compactedStart has been called first, and ignores gaps.
+ */
+ def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+ if (!buffer.hasNext()) { poll(timeout) }
--- End diff --
Nit: I'd expand this onto two lines
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]