Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20767#discussion_r173331304
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends
Logging {
}
}
- def releaseKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
+ private def releaseKafkaConsumer(
+ topicPartition: TopicPartition,
+ kafkaParams: ju.Map[String, Object]): Unit = {
+ val key = new CacheKey(topicPartition, kafkaParams)
synchronized {
val consumer = cache.get(key)
if (consumer != null) {
- consumer.inuse = false
+ if (consumer.markedForClose) {
+ consumer.close()
+ cache.remove(key)
+ } else {
+ consumer.inuse = false
+ }
} else {
logWarning(s"Attempting to release consumer that does not exist")
--- End diff --
This is the case that a consumer may be evicted because of the max
capacity. In this case, we should close the internal consumer.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]