Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20767#discussion_r173330725
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -342,80 +401,65 @@ private[kafka010] object CachedKafkaConsumer extends
Logging {
}
}
- def releaseKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
+ private def releaseKafkaConsumer(
+ topicPartition: TopicPartition,
+ kafkaParams: ju.Map[String, Object]): Unit = {
+ val key = new CacheKey(topicPartition, kafkaParams)
synchronized {
val consumer = cache.get(key)
if (consumer != null) {
- consumer.inuse = false
+ if (consumer.markedForClose) {
+ consumer.close()
+ cache.remove(key)
+ } else {
+ consumer.inuse = false
+ }
} else {
logWarning(s"Attempting to release consumer that does not exist")
}
}
}
- /**
- * Removes (and closes) the Kafka Consumer for the given topic,
partition and group id.
- */
- def removeKafkaConsumer(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): Unit = {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
- synchronized {
- val removedConsumer = cache.remove(key)
- if (removedConsumer != null) {
- removedConsumer.close()
- }
- }
- }
/**
* Get a cached consumer for groupId, assigned to topic and partition.
* If matching consumer doesn't already exist, will be created using
kafkaParams.
*/
- def getOrCreate(
- topic: String,
- partition: Int,
- kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer =
synchronized {
- val groupId =
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
- val topicPartition = new TopicPartition(topic, partition)
- val key = CacheKey(groupId, topicPartition)
-
- // If this is reattempt at running the task, then invalidate cache and
start with
- // a new consumer
+ def acquire(
+ topicPartition: TopicPartition,
+ kafkaParams: ju.Map[String, Object],
+ useCache: Boolean): KafkaDataConsumer = synchronized {
+ val key = new CacheKey(topicPartition, kafkaParams)
+ val existingInternalConsumer = cache.get(key)
+
+ lazy val newNonCachedConsumer =
+ new NonCachedKafkaDataConsumer(new
InternalKafkaConsumer(topicPartition, kafkaParams))
+
if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
- removeKafkaConsumer(topic, partition, kafkaParams)
- val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
- consumer.inuse = true
- cache.put(key, consumer)
- consumer
- } else {
- if (!cache.containsKey(key)) {
- cache.put(key, new CachedKafkaConsumer(topicPartition,
kafkaParams))
+ // If this is reattempt at running the task, then invalidate cache
and start with
+ // a new consumer
+ if (existingInternalConsumer != null) {
+ existingInternalConsumer.markedForClose = true
}
- val consumer = cache.get(key)
- consumer.inuse = true
- consumer
+ newNonCachedConsumer
+ } else if (!useCache) {
+ newNonCachedConsumer
+ } else if (existingInternalConsumer == null) {
+ newNonCachedConsumer.internalConsumer.inuse = true
+ cache.put(key, newNonCachedConsumer.internalConsumer)
+ newNonCachedConsumer
--- End diff --
We should return a CachedKafkaDataConsumer in this case. Right?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]