Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20572#discussion_r170277915
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
---
@@ -172,57 +187,138 @@ private[spark] class KafkaRDD[K, V](
override def compute(thePart: Partition, context: TaskContext):
Iterator[ConsumerRecord[K, V]] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
- assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+ require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
logInfo(s"Beginning offset ${part.fromOffset} is the same as ending
offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
- new KafkaRDDIterator(part, context)
+ logInfo(s"Computing topic ${part.topic}, partition ${part.partition}
" +
+ s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+ if (compacted) {
+ new CompactedKafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ )
+ } else {
+ new KafkaRDDIterator[K, V](
+ part,
+ context,
+ kafkaParams,
+ useConsumerCache,
+ pollTimeout,
+ cacheInitialCapacity,
+ cacheMaxCapacity,
+ cacheLoadFactor
+ )
+ }
}
}
+}
- /**
- * An iterator that fetches messages directly from Kafka for the offsets
in partition.
- * Uses a cached consumer where possible to take advantage of prefetching
- */
- private class KafkaRDDIterator(
- part: KafkaRDDPartition,
- context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
- logInfo(s"Computing topic ${part.topic}, partition ${part.partition} "
+
- s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+ part: KafkaRDDPartition,
+ context: TaskContext,
+ kafkaParams: ju.Map[String, Object],
+ useConsumerCache: Boolean,
+ pollTimeout: Long,
+ cacheInitialCapacity: Int,
+ cacheMaxCapacity: Int,
+ cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+ val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+ context.addTaskCompletionListener{ context => closeIfNeeded() }
--- End diff --
This could be `...(_ => closeIfNeeded())`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]