HeartSaVioR commented on a change in pull request #22138: [SPARK-25151][SS] 
Apply Apache Commons Pool to KafkaDataConsumer
URL: https://github.com/apache/spark/pull/22138#discussion_r303183848
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##########
 @@ -445,197 +529,68 @@ private[kafka010] case class InternalKafkaConsumer(
    * Throw an exception or log a warning as per `failOnDataLoss`.
    */
   private def reportDataLoss(
+      topicPartition: TopicPartition,
+      groupId: String,
       failOnDataLoss: Boolean,
       message: String,
       cause: Throwable = null): Unit = {
-    val finalMessage = s"$message ${additionalMessage(failOnDataLoss)}"
+    val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, 
failOnDataLoss)}"
     reportDataLoss0(failOnDataLoss, finalMessage, cause)
   }
 
-  def close(): Unit = consumer.close()
-
-  private def seek(offset: Long): Unit = {
-    logDebug(s"Seeking to $groupId $topicPartition $offset")
-    consumer.seek(topicPartition, offset)
-  }
-
-  /**
-   * Poll messages from Kafka starting from `offset` and update `fetchedData`. 
`fetchedData` may be
-   * empty if the Kafka consumer fetches some messages but all of them are not 
visible messages
-   * (either transaction messages, or aborted messages when `isolation.level` 
is `read_committed`).
-   *
-   * @throws OffsetOutOfRangeException if `offset` is out of range.
-   * @throws TimeoutException if the consumer position is not changed after 
polling. It means the
-   *                          consumer polls nothing before timeout.
-   */
-  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
-    // Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
-    seek(offset)
-    val p = consumer.poll(pollTimeoutMs)
-    val r = p.records(topicPartition)
-    logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
-    val offsetAfterPoll = consumer.position(topicPartition)
-    logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
-    fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
-    if (!fetchedData.hasNext) {
-      // We cannot fetch anything after `poll`. Two possible cases:
-      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
-      //   be thrown.
-      // - Cannot fetch any data before timeout. `TimeoutException` will be 
thrown.
-      // - Fetched something but all of them are not invisible. This is a 
valid case and let the
-      //   caller handles this.
-      val range = getAvailableOffsetRange()
-      if (offset < range.earliest || offset >= range.latest) {
-        throw new OffsetOutOfRangeException(
-          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
-      } else if (offset == offsetAfterPoll) {
-        throw new TimeoutException(
-          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
-      }
-    }
+  private def runUninterruptiblyIfPossible[T](body: => T): T = 
Thread.currentThread match {
+    case ut: UninterruptibleThread =>
+      ut.runUninterruptibly(body)
+    case _ =>
+      logWarning("KafkaDataConsumer is not running in UninterruptibleThread. " 
+
+        "It may hang when KafkaDataConsumer's methods are interrupted because 
of KAFKA-1894")
+      body
   }
 }
 
-
 private[kafka010] object KafkaDataConsumer extends Logging {
+  val UNKNOWN_OFFSET = -2L
 
   case class AvailableOffsetRange(earliest: Long, latest: Long)
 
-  private case class CachedKafkaDataConsumer(internalConsumer: 
InternalKafkaConsumer)
-    extends KafkaDataConsumer {
-    assert(internalConsumer.inUse) // make sure this has been set to true
-    override def release(): Unit = { 
KafkaDataConsumer.release(internalConsumer) }
-  }
-
-  private case class NonCachedKafkaDataConsumer(internalConsumer: 
InternalKafkaConsumer)
-    extends KafkaDataConsumer {
-    override def release(): Unit = { internalConsumer.close() }
-  }
-
-  private case class CacheKey(groupId: String, topicPartition: TopicPartition) 
{
+  case class CacheKey(groupId: String, topicPartition: TopicPartition) {
     def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, 
Object]) =
       
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], 
topicPartition)
   }
 
-  // This cache has the following important properties.
-  // - We make a best-effort attempt to maintain the max size of the cache as 
configured capacity.
-  //   The capacity is not guaranteed to be maintained, especially when there 
are more active
-  //   tasks simultaneously using consumers than the capacity.
-  private lazy val cache = {
-    val conf = SparkEnv.get.conf
-    val capacity = conf.get(CONSUMER_CACHE_CAPACITY)
-    new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, 
true) {
-      override def removeEldestEntry(
-        entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {
-
-        // Try to remove the least-used entry if its currently not in use.
-        //
-        // If you cannot remove it, then the cache will keep growing. In the 
worst case,
-        // the cache will grow to the max number of concurrent tasks that can 
run in the executor,
-        // (that is, number of tasks slots) after which it will never reduce. 
This is unlikely to
-        // be a serious problem because an executor with more than 64 
(default) tasks slots is
-        // likely running on a beefy machine that can handle a large number of 
simultaneously
-        // active consumers.
-
-        if (!entry.getValue.inUse && this.size > capacity) {
-          logWarning(
-            s"KafkaConsumer cache hitting max capacity of $capacity, " +
-              s"removing consumer for ${entry.getKey}")
-          try {
-            entry.getValue.close()
-          } catch {
-            case e: SparkException =>
-              logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-          }
-          true
-        } else {
-          false
-        }
-      }
+  private val consumerPool = InternalKafkaConsumerPool.build
+  private val fetchedDataPool = FetchedDataPool.build
+
+  ShutdownHookManager.addShutdownHook { () =>
 
 Review comment:
   I think the real issue is using `object` in some instance which is bound to 
the specific task, specific query, etc (if we consider multiple applications 
running in same JVM, then specific application too). I have seen other cases 
for this as well, and without extreme care, it causes an issue. e.g. #24946 
   
   I'm also not sure Spark has some sort of lifecycle management and hook to 
handle such case, but I'll consider it. Maybe shutdown hook is still needed if 
Spark can't guarantee to signal listener for its lifecycle (like why Java 
should need shutdown hook).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to