Github user tedyu commented on a diff in the pull request:
https://github.com/apache/spark/pull/20767#discussion_r173636002
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends
Logging {
}
}
- def releaseKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
+ private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {
- val consumer = cache.get(key)
- if (consumer != null) {
- consumer.inuse = false
- } else {
- logWarning(s"Attempting to release consumer that does not exist")
- }
- }
- }
- /**
- * Removes (and closes) the Kafka Consumer for the given topic,
partition and group id.
- */
- def removeKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
- synchronized {
- val removedConsumer = cache.remove(key)
- if (removedConsumer != null) {
- removedConsumer.close()
+ // If it has been marked for close, then do it any way
+ if (intConsumer.inuse && intConsumer.markedForClose)
intConsumer.close()
--- End diff --
Is it possible we have the following condition - should intConsumer.close()
be called ?
!intConsumer.inuse && intConsumer.markedForClose
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]