HeartSaVioR commented on a change in pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer URL: https://github.com/apache/spark/pull/22138#discussion_r303183848
########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ########## @@ -445,197 +529,68 @@ private[kafka010] case class InternalKafkaConsumer( * Throw an exception or log a warning as per `failOnDataLoss`. */ private def reportDataLoss( + topicPartition: TopicPartition, + groupId: String, failOnDataLoss: Boolean, message: String, cause: Throwable = null): Unit = { - val finalMessage = s"$message ${additionalMessage(failOnDataLoss)}" + val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}" reportDataLoss0(failOnDataLoss, finalMessage, cause) } - def close(): Unit = consumer.close() - - private def seek(offset: Long): Unit = { - logDebug(s"Seeking to $groupId $topicPartition $offset") - consumer.seek(topicPartition, offset) - } - - /** - * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be - * empty if the Kafka consumer fetches some messages but all of them are not visible messages - * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`). - * - * @throws OffsetOutOfRangeException if `offset` is out of range. - * @throws TimeoutException if the consumer position is not changed after polling. It means the - * consumer polls nothing before timeout. - */ - private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = { - // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - val p = consumer.poll(pollTimeoutMs) - val r = p.records(topicPartition) - logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") - val offsetAfterPoll = consumer.position(topicPartition) - logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") - fetchedData.withNewPoll(r.listIterator, offsetAfterPoll) - if (!fetchedData.hasNext) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will - // be thrown. - // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. - // - Fetched something but all of them are not invisible. This is a valid case and let the - // caller handles this. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { - throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else if (offset == offsetAfterPoll) { - throw new TimeoutException( - s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") - } - } + private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match { + case ut: UninterruptibleThread => + ut.runUninterruptibly(body) + case _ => + logWarning("KafkaDataConsumer is not running in UninterruptibleThread. " + + "It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894") + body } } - private[kafka010] object KafkaDataConsumer extends Logging { + val UNKNOWN_OFFSET = -2L case class AvailableOffsetRange(earliest: Long, latest: Long) - private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer) - extends KafkaDataConsumer { - assert(internalConsumer.inUse) // make sure this has been set to true - override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) } - } - - private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer) - extends KafkaDataConsumer { - override def release(): Unit = { internalConsumer.close() } - } - - private case class CacheKey(groupId: String, topicPartition: TopicPartition) { + case class CacheKey(groupId: String, topicPartition: TopicPartition) { def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) = this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition) } - // This cache has the following important properties. - // - We make a best-effort attempt to maintain the max size of the cache as configured capacity. - // The capacity is not guaranteed to be maintained, especially when there are more active - // tasks simultaneously using consumers than the capacity. - private lazy val cache = { - val conf = SparkEnv.get.conf - val capacity = conf.get(CONSUMER_CACHE_CAPACITY) - new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) { - override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = { - - // Try to remove the least-used entry if its currently not in use. - // - // If you cannot remove it, then the cache will keep growing. In the worst case, - // the cache will grow to the max number of concurrent tasks that can run in the executor, - // (that is, number of tasks slots) after which it will never reduce. This is unlikely to - // be a serious problem because an executor with more than 64 (default) tasks slots is - // likely running on a beefy machine that can handle a large number of simultaneously - // active consumers. - - if (!entry.getValue.inUse && this.size > capacity) { - logWarning( - s"KafkaConsumer cache hitting max capacity of $capacity, " + - s"removing consumer for ${entry.getKey}") - try { - entry.getValue.close() - } catch { - case e: SparkException => - logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e) - } - true - } else { - false - } - } + private val consumerPool = InternalKafkaConsumerPool.build + private val fetchedDataPool = FetchedDataPool.build + + ShutdownHookManager.addShutdownHook { () => Review comment: I think the real issue is using `object` in some instance which is bound to the specific task, specific query, etc (if we consider multiple applications running in same JVM, then specific application too). I have seen other cases for this as well, and without extreme care, it causes an issue. e.g. #24946 I'm also not sure Spark has some sort of lifecycle management and hook to handle such case, but I'll consider it. Maybe shutdown hook is still needed if Spark can't guarantee to signal listener for its lifecycle (like why Java should need shutdown hook). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org