Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20767#discussion_r173602790
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends
Logging {
}
}
- def releaseKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
+ private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
synchronized {
- val consumer = cache.get(key)
- if (consumer != null) {
- consumer.inuse = false
- } else {
- logWarning(s"Attempting to release consumer that does not exist")
- }
- }
- }
- /**
- * Removes (and closes) the Kafka Consumer for the given topic,
partition and group id.
- */
- def removeKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
- synchronized {
- val removedConsumer = cache.remove(key)
- if (removedConsumer != null) {
- removedConsumer.close()
+ // If it has been marked for close, then do it any way
+ if (intConsumer.inuse && intConsumer.markedForClose)
intConsumer.close()
+ intConsumer.inuse = false
+
+ // Clear the consumer from the cache if this is indeed the consumer
present in the cache
+ val key = new CacheKey(intConsumer.topicPartition,
intConsumer.kafkaParams)
+ val cachedIntConsumer = cache.get(key)
+ if (cachedIntConsumer != null) {
+ if (cachedIntConsumer.eq(intConsumer)) {
+ // The released consumer is indeed the cached one.
+ cache.remove(key)
+ } else {
+ // The released consumer is not the cached one. Don't do
anything.
+ // This should not happen as long as we maintain the invariant
mentioned above.
+ logWarning(
+ s"Cached consumer not the same one as the one being release" +
+ s"\ncached = $cachedIntConsumer
[${System.identityHashCode(cachedIntConsumer)}]" +
+ s"\nreleased = $intConsumer
[${System.identityHashCode(intConsumer)}]")
+ }
+ } else {
+ // The released consumer is not in the cache. Don't do anything.
+ // This should not happen as long as we maintain the invariant
mentioned above.
+ logWarning(s"Attempting to release consumer that is not in the
cache")
}
}
}
/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using
kafkaParams.
+ * The returned consumer must be released explicitly using
[[KafkaDataConsumer.release()]].
+ *
+ * Note: This method guarantees that the consumer returned is not
currently in use by any one
+ * else. Within this guarantee, this will make a best effort attempt to
re-use consumers by
+ * caching them and tracking when they are in use.
*/
- def getOrCreate(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer =
synchronized {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
- // If this is reattempt at running the task, then invalidate cache and
start with
- // a new consumer
+ def acquire(
+ topicPartition: TopicPartition,
+ kafkaParams: ju.Map[String, Object],
+ useCache: Boolean): KafkaDataConsumer = synchronized {
+ val key = new CacheKey(topicPartition, kafkaParams)
+ val existingInternalConsumer = cache.get(key)
+
+ lazy val newInternalConsumer = new
InternalKafkaConsumer(topicPartition, kafkaParams)
+
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
- removeKafkaConsumer(topic, partition, kafkaParams)
- val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
- consumer.inuse = true
- cache.put(key, consumer)
- consumer
- } else {
- if (!cache.containsKey(key)) {
- cache.put(key, new CachedKafkaConsumer(topicPartition,
kafkaParams))
+ // If this is reattempt at running the task, then invalidate cached
consumer if any and
+ // start with a new one.
+ if (existingInternalConsumer != null) {
+ if (existingInternalConsumer.inuse) {
+ // Consumer exists in cache and is somehow in use. Don't close
it immediately, but
+ // mark it for being closed when it is released.
+ existingInternalConsumer.markedForClose = true
+ NonCachedKafkaDataConsumer(newInternalConsumer)
+
+ } else {
+ // Consumer exists in cache and is not in use, so close it
immediately and replace
+ // it with a new one.
+ existingInternalConsumer.close()
+ cache.put(key, newInternalConsumer)
+ CachedKafkaDataConsumer(newInternalConsumer)
+
+ }
+ } else {
+ // Consumer is not cached, put the new one in the cache
+ cache.put(key, newInternalConsumer)
+ CachedKafkaDataConsumer(newInternalConsumer)
+
}
- val consumer = cache.get(key)
- consumer.inuse = true
- consumer
+ } else if (!useCache) {
+ // If planner asks to not reuse consumers, then do not use it,
return a new consumer
+ NonCachedKafkaDataConsumer(newInternalConsumer)
+
+ } else if (existingInternalConsumer == null) {
+ // If consumer is not already cached, then put a new in the cache
and return it
+ newInternalConsumer.inuse = true
+ cache.put(key, newInternalConsumer)
+ CachedKafkaDataConsumer(newInternalConsumer)
+
+ } else if (existingInternalConsumer.inuse) {
+ // If consumer is already cached but is currently in use, then
return a new consumer
+ NonCachedKafkaDataConsumer(newInternalConsumer)
+
+ } else {
+ // If consumer is already cached and is currently not in use, then
return that consumer
+ CachedKafkaDataConsumer(existingInternalConsumer)
--- End diff --
i will run a longer stress test locally just to be sure.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]