Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r174968594
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -342,80 +415,103 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
         }
       }
     
    -  def releaseKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    +  private def releaseConsumer(intConsumer: InternalKafkaConsumer): Unit = {
         synchronized {
    -      val consumer = cache.get(key)
    -      if (consumer != null) {
    -        consumer.inuse = false
    -      } else {
    -        logWarning(s"Attempting to release consumer that does not exist")
    -      }
    -    }
    -  }
     
    -  /**
    -   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
    -   */
    -  def removeKafkaConsumer(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): Unit = {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    -    synchronized {
    -      val removedConsumer = cache.remove(key)
    -      if (removedConsumer != null) {
    -        removedConsumer.close()
    +      // If it has been marked for close, then do it any way
    +      if (intConsumer.inuse && intConsumer.markedForClose) 
intConsumer.close()
    +      intConsumer.inuse = false
    +
    +      // Clear the consumer from the cache if this is indeed the consumer 
present in the cache
    +      val key = new CacheKey(intConsumer.topicPartition, 
intConsumer.kafkaParams)
    +      val cachedIntConsumer = cache.get(key)
    +      if (cachedIntConsumer != null) {
    +        if (cachedIntConsumer.eq(intConsumer)) {
    +          // The released consumer is indeed the cached one.
    +          cache.remove(key)
    +        } else {
    +          // The released consumer is not the cached one. Don't do 
anything.
    +          // This should not happen as long as we maintain the invariant 
mentioned above.
    +          logWarning(
    +            s"Cached consumer not the same one as the one being release" +
    +              s"\ncached = $cachedIntConsumer 
[${System.identityHashCode(cachedIntConsumer)}]" +
    +              s"\nreleased = $intConsumer 
[${System.identityHashCode(intConsumer)}]")
    +        }
    +      } else {
    +        // The released consumer is not in the cache. Don't do anything.
    +        // This should not happen as long as we maintain the invariant 
mentioned above.
    +        logWarning(s"Attempting to release consumer that is not in the 
cache")
           }
         }
       }
     
       /**
        * Get a cached consumer for groupId, assigned to topic and partition.
        * If matching consumer doesn't already exist, will be created using 
kafkaParams.
    +   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
    +   *
    +   * Note: This method guarantees that the consumer returned is not 
currently in use by any one
    +   * else. Within this guarantee, this will make a best effort attempt to 
re-use consumers by
    +   * caching them and tracking when they are in use.
        */
    -  def getOrCreate(
    -      topic: String,
    -      partition: Int,
    -      kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -    val topicPartition = new TopicPartition(topic, partition)
    -    val key = CacheKey(groupId, topicPartition)
    -
    -    // If this is reattempt at running the task, then invalidate cache and 
start with
    -    // a new consumer
    +  def acquire(
    +      topicPartition: TopicPartition,
    +      kafkaParams: ju.Map[String, Object],
    +      useCache: Boolean): KafkaDataConsumer = synchronized {
    +    val key = new CacheKey(topicPartition, kafkaParams)
    +    val existingInternalConsumer = cache.get(key)
    +
    +    lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
    +
         if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      removeKafkaConsumer(topic, partition, kafkaParams)
    -      val consumer = new CachedKafkaConsumer(topicPartition, kafkaParams)
    -      consumer.inuse = true
    -      cache.put(key, consumer)
    -      consumer
    -    } else {
    -      if (!cache.containsKey(key)) {
    -        cache.put(key, new CachedKafkaConsumer(topicPartition, 
kafkaParams))
    +      // If this is reattempt at running the task, then invalidate cached 
consumer if any and
    +      // start with a new one.
    +      if (existingInternalConsumer != null) {
    +        if (existingInternalConsumer.inuse) {
    +          // Consumer exists in cache and is somehow in use. Don't close 
it immediately, but
    +          // mark it for being closed when it is released.
    +          existingInternalConsumer.markedForClose = true
    +          NonCachedKafkaDataConsumer(newInternalConsumer)
    +
    +        } else {
    +          // Consumer exists in cache and is not in use, so close it 
immediately and replace
    +          // it with a new one.
    +          existingInternalConsumer.close()
    +          cache.put(key, newInternalConsumer)
    +          CachedKafkaDataConsumer(newInternalConsumer)
    +
    +        }
    +      } else {
    +        // Consumer is not cached, put the new one in the cache
    +        cache.put(key, newInternalConsumer)
    +        CachedKafkaDataConsumer(newInternalConsumer)
    +
           }
    -      val consumer = cache.get(key)
    -      consumer.inuse = true
    -      consumer
    +    } else if (!useCache) {
    --- End diff --
    
    Technically that wont happens because the continuous query and the batch 
query will have different groupids. But I agree that if `useCache` is `false`, 
then we should not put it in the cache in any way. In fact, we can simplify the 
task retry case further by never putting the new one in the cache, only 
invalidate the existing cached one. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to