Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20767#discussion_r173599723
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -27,30 +27,73 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
+private[kafka010] sealed trait KafkaDataConsumer {
+ /**
+ * Get the record for the given offset if available. Otherwise it will
either throw error
+ * (if failOnDataLoss = true), or return the next available offset
within [offset, untilOffset),
+ * or null.
+ *
+ * @param offset the offset to fetch.
+ * @param untilOffset the max offset to fetch. Exclusive.
+ * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
+ * @param failOnDataLoss When `failOnDataLoss` is `true`, this method
will either return record at
+ * offset if available, or throw exception.when
`failOnDataLoss` is `false`,
+ * this method will either return record at offset
if available, or return
+ * the next earliest available record less than
untilOffset, or null. It
+ * will not throw any exception.
+ */
+ def get(
+ offset: Long,
+ untilOffset: Long,
+ pollTimeoutMs: Long,
+ failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] =
{
+ internalConsumer.get(offset, untilOffset, pollTimeoutMs,
failOnDataLoss)
+ }
+
+ /**
+ * Return the available offset range of the current partition. It's a
pair of the earliest offset
+ * and the latest offset.
+ */
+ def getAvailableOffsetRange(): AvailableOffsetRange =
internalConsumer.getAvailableOffsetRange()
+
+ /**
+ * Release this consumer from being further used. Depending on its
implementation,
+ * this consumer will be either finalized, or reset for reuse later.
+ */
+ def release(): Unit
+
+ /** Reference to the internal implementation that this wrapper delegates
to */
+ protected def internalConsumer: InternalKafkaConsumer
+}
+
/**
- * Consumer of single topicpartition, intended for cached reuse.
- * Underlying consumer is not threadsafe, so neither is this,
- * but processing the same topicpartition and group id in multiple threads
is usually bad anyway.
+ * A wrapper around Kafka's KafkaConsumer that throws error when data loss
is detected.
+ * This is not for direct use outside this file.
*/
-private[kafka010] case class CachedKafkaConsumer private(
+private[kafka010] case class InternalKafkaConsumer(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object]) extends Logging {
- import CachedKafkaConsumer._
+ import InternalKafkaConsumer._
private val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- private var consumer = createConsumer
+ @volatile private var consumer = createConsumer
--- End diff --
I think these `@volatile`s are not necessary. I'm okey with them though.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]