gaborgsomogyi commented on a change in pull request #22138: [SPARK-25151][SS] 
Apply Apache Commons Pool to KafkaDataConsumer
URL: https://github.com/apache/spark/pull/22138#discussion_r317508580
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##########
 @@ -18,228 +18,253 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.KafkaConfigUpdater
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+    val topicPartition: TopicPartition,
+    val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to fetch 
next available record
-   * within [offset, untilOffset).
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if the 
Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
    *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible messages 
or data loss and
-   * reaches `untilOffset`, it will return `null`.
-   *
-   * @param offset         the offset to fetch.
-   * @param untilOffset    the max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will 
either return record at
-   *                       offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *                       this method will either return record at offset if 
available, or return
-   *                       the next earliest available record less than 
untilOffset, or null. It
-   *                       will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed after 
polling. It means the
+   *                          consumer polls nothing before timeout.
    */
-  def get(
-      offset: Long,
-      untilOffset: Long,
-      pollTimeoutMs: Long,
-      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = {
-    internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+    // Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+    seek(offset)
+    val p = consumer.poll(pollTimeoutMs)
+    val r = p.records(topicPartition)
+    logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+    val offsetAfterPoll = consumer.position(topicPartition)
+    logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
+    val fetchedData = (r, offsetAfterPoll)
+    if (r.isEmpty) {
+      // We cannot fetch anything after `poll`. Two possible cases:
+      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
+      //   be thrown.
+      // - Cannot fetch any data before timeout. `TimeoutException` will be 
thrown.
+      // - Fetched something but all of them are not invisible. This is a 
valid case and let the
+      //   caller handles this.
+      val range = getAvailableOffsetRange()
+      if (offset < range.earliest || offset >= range.latest) {
+        throw new OffsetOutOfRangeException(
+          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
+      } else if (offset == offsetAfterPoll) {
+        throw new TimeoutException(
+          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
+      }
+    }
+    fetchedData
   }
 
   /**
    * Return the available offset range of the current partition. It's a pair 
of the earliest offset
    * and the latest offset.
    */
-  def getAvailableOffsetRange(): AvailableOffsetRange = 
internalConsumer.getAvailableOffsetRange()
+  def getAvailableOffsetRange(): AvailableOffsetRange = {
+    consumer.seekToBeginning(Set(topicPartition).asJava)
+    val earliestOffset = consumer.position(topicPartition)
+    consumer.seekToEnd(Set(topicPartition).asJava)
+    val latestOffset = consumer.position(topicPartition)
+    AvailableOffsetRange(earliestOffset, latestOffset)
+  }
 
-  /**
-   * Release this consumer from being further used. Depending on its 
implementation,
-   * this consumer will be either finalized, or reset for reuse later.
-   */
-  def release(): Unit
+  override def close(): Unit = {
+    consumer.close()
+  }
 
-  /** Reference to the internal implementation that this wrapper delegates to 
*/
-  protected def internalConsumer: InternalKafkaConsumer
-}
+  /** Create a KafkaConsumer to fetch records for `topicPartition` */
+  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
+    val updatedKafkaParams = KafkaConfigUpdater("executor", 
kafkaParams.asScala.toMap)
+      .setAuthenticationConfigIfNeeded()
 
 Review comment:
   I've made a deep dive and thought the situation through and here are my 
findings.
   * The original behavior is not changed, which I think it's fine here. Please 
see my points below.
   * Putting the token into the cache key is not a good idea because if the 
eviction is not time based then the cache will constantly grow till it's max 
size and stay it like that. This would introduce quite a constraint and in case 
of large cluster it could be quite a waste of resources. => As a conclusion I'm 
fine with the actual way and later this can be enhanced in a different jira.
   * I've had a look at the "ping-pong" proposal and I think it could generate 
quite a traffic. I think this can be solved in a more cheap way.
   * What I think would be a better solution is to invalidate (but not close if 
used) consumers when the delegation token changed. This way tasks can pick up 
the latest and greatest token + not needed to do and additional administration.
   
   All in all this discussion can be resolved from my perspective.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to