Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20572#discussion_r169490949
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
---
@@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
- new KafkaRDDIterator(part, context)
+ logInfo(s"Computing topic ${part.topic}, partition ${part.partition}
" +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+ if (compacted) {
+ new CompactedKafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ )
+ } else {
+ new KafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ )
+ }
}
}
+}
- /**
- * An iterator that fetches messages directly from Kafka for the offsets
in partition.
- * Uses a cached consumer where possible to take advantage of prefetching
- */
- private class KafkaRDDIterator(
- part: KafkaRDDPartition,
- context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
- logInfo(s"Computing topic ${part.topic}, partition ${part.partition} "
+
- s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
-
- context.addTaskCompletionListener{ context => closeIfNeeded() }
-
- val consumer = if (useConsumerCache) {
- CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity,
cacheLoadFactor)
- if (context.attemptNumber >= 1) {
- // just in case the prior attempt failures were cache related
- CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
- }
- CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition,
kafkaParams)
- } else {
- CachedKafkaConsumer.getUncached[K, V](groupId, part.topic,
part.partition, kafkaParams)
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+ part: KafkaRDDPartition,
+ context: TaskContext,
+ kafkaParams: ju.Map[String, Object],
+ useConsumerCache: Boolean,
+ pollTimeout: Long,
+ cacheInitialCapacity: Int,
+ cacheMaxCapacity: Int,
+ cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+ val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+ context.addTaskCompletionListener{ context => closeIfNeeded() }
+
+ val consumer = if (useConsumerCache) {
+ CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity,
cacheLoadFactor)
+ if (context.attemptNumber >= 1) {
+ // just in case the prior attempt failures were cache related
+ CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
}
+ CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition,
kafkaParams)
+ } else {
+ CachedKafkaConsumer.getUncached[K, V](groupId, part.topic,
part.partition, kafkaParams)
+ }
- var requestOffset = part.fromOffset
+ var requestOffset = part.fromOffset
- def closeIfNeeded(): Unit = {
- if (!useConsumerCache && consumer != null) {
- consumer.close
- }
+ def closeIfNeeded(): Unit = {
+ if (!useConsumerCache && consumer != null) {
+ consumer.close
}
+ }
+
+ override def hasNext(): Boolean = requestOffset < part.untilOffset
- override def hasNext(): Boolean = requestOffset < part.untilOffset
+ override def next(): ConsumerRecord[K, V] = {
+ assert(hasNext(), "Can't call getNext() once untilOffset has been
reached")
+ val r = consumer.get(requestOffset, pollTimeout)
+ requestOffset += 1
+ r
+ }
+}
- override def next(): ConsumerRecord[K, V] = {
- assert(hasNext(), "Can't call getNext() once untilOffset has been
reached")
- val r = consumer.get(requestOffset, pollTimeout)
- requestOffset += 1
- r
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching.
+ * Intended for compacted topics, or other cases when non-consecutive
offsets are ok.
+ */
+private class CompactedKafkaRDDIterator[K, V](
+ part: KafkaRDDPartition,
+ context: TaskContext,
+ kafkaParams: ju.Map[String, Object],
+ useConsumerCache: Boolean,
+ pollTimeout: Long,
+ cacheInitialCapacity: Int,
+ cacheMaxCapacity: Int,
+ cacheLoadFactor: Float
+ ) extends KafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ ) {
+
+ consumer.compactedStart(part.fromOffset, pollTimeout)
+
+ var nextRecord = consumer.compactedNext(pollTimeout)
--- End diff --
`private`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]