Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/20767#discussion_r173543942
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -342,80 +415,99 @@ private[kafka010] object CachedKafkaConsumer extends
Logging {
}
}
- def releaseKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
+ private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {
- val consumer = cache.get(key)
- if (consumer != null) {
- consumer.inuse = false
- } else {
- logWarning(s"Attempting to release consumer that does not exist")
- }
- }
- }
- /**
- * Removes (and closes) the Kafka Consumer for the given topic,
partition and group id.
- */
- def removeKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
- synchronized {
- val removedConsumer = cache.remove(key)
- if (removedConsumer != null) {
- removedConsumer.close()
+ // If it has been marked for close, then do it any way
+ if (intConsumer.inuse && intConsumer.markedForClose)
intConsumer.close()
+ intConsumer.inuse = false
+
+ // Clear the consumer from the cache if this is indeed the consumer
present in the cache
+ val key = new CacheKey(intConsumer.topicPartition,
intConsumer.kafkaParams)
+ val cachedIntConsumer = cache.get(key)
+ if (cachedIntConsumer != null) {
+ if (cachedIntConsumer.eq(intConsumer)) {
+ // The released consumer is indeed the cached one.
+ cache.remove(key)
+ } else {
+ // The released consumer is not the cached one. Don't do
anything.
+ // This should not happen as long as we maintain the invariant
mentioned above.
+ logWarning(
+ s"Cached consumer not the same one as the one being release" +
+ s"\ncached = $cachedIntConsumer
[${System.identityHashCode(cachedIntConsumer)}]" +
+ s"\nreleased = $intConsumer
[${System.identityHashCode(intConsumer)}]")
+ }
+ } else {
+ // The released consumer is not in the cache. Don't do anything.
+ // This should not happen as long as we maintain the invariant
mentioned above.
+ logWarning(s"Attempting to release consumer that is not in the
cache")
}
}
}
/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using
kafkaParams.
+ * This will make a best effort attempt to
--- End diff --
I would love to see the rest of this sentence. Such a cliffhanger!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]