Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22138#discussion_r211053868
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -425,70 +381,36 @@ private[kafka010] object KafkaDataConsumer extends 
Logging {
       def acquire(
           topicPartition: TopicPartition,
           kafkaParams: ju.Map[String, Object],
    -      useCache: Boolean): KafkaDataConsumer = synchronized {
    -    val key = new CacheKey(topicPartition, kafkaParams)
    -    val existingInternalConsumer = cache.get(key)
    +      useCache: Boolean): KafkaDataConsumer = {
     
    -    lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
    +    if (!useCache) {
    +      return NonCachedKafkaDataConsumer(new 
InternalKafkaConsumer(topicPartition, kafkaParams))
    +    }
     
    -    if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      // If this is reattempt at running the task, then invalidate cached 
consumer if any and
    -      // start with a new one.
    -      if (existingInternalConsumer != null) {
    -        // Consumer exists in cache. If its in use, mark it for closing 
later, or close it now.
    -        if (existingInternalConsumer.inUse) {
    -          existingInternalConsumer.markedForClose = true
    -        } else {
    -          existingInternalConsumer.close()
    -        }
    -      }
    -      cache.remove(key)  // Invalidate the cache in any case
    -      NonCachedKafkaDataConsumer(newInternalConsumer)
    +    val key = new CacheKey(topicPartition, kafkaParams)
     
    -    } else if (!useCache) {
    -      // If planner asks to not reuse consumers, then do not use it, 
return a new consumer
    -      NonCachedKafkaDataConsumer(newInternalConsumer)
    +    if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    +      // If this is reattempt at running the task, then invalidate cached 
consumer if any.
     
    -    } else if (existingInternalConsumer == null) {
    -      // If consumer is not already cached, then put a new in the cache 
and return it
    -      cache.put(key, newInternalConsumer)
    -      newInternalConsumer.inUse = true
    -      CachedKafkaDataConsumer(newInternalConsumer)
    +      // invalidate all idle consumers for the key
    +      pool.invalidateKey(key)
     
    -    } else if (existingInternalConsumer.inUse) {
    -      // If consumer is already cached but is currently in use, then 
return a new consumer
    -      NonCachedKafkaDataConsumer(newInternalConsumer)
    +      // borrow a consumer from pool even in this case
    +    }
     
    -    } else {
    -      // If consumer is already cached and is currently not in use, then 
return that consumer
    -      existingInternalConsumer.inUse = true
    -      CachedKafkaDataConsumer(existingInternalConsumer)
    +    try {
    +      CachedKafkaDataConsumer(pool.borrowObject(key, kafkaParams))
    +    } catch { case _: NoSuchElementException =>
    +      // There's neither idle object to clean up nor available space in 
pool:
    +      // fail back to create non-cached consumer
    --- End diff --
    
    This approach introduces behavior change: even though `cache` had capacity, 
the `cache` worked like soft capacity and allowed adding item to the cache when 
there's neither idle object nor free space. 
    
    New behavior of the KafkaDataConsumer is creating all the objects to 
non-cached whenever pool is exhausted and there's no idle object to free up.
    
    I think it is not a big deal when we configure 
"spark.sql.kafkaConsumerCache.capacity" properly, and having hard capacity 
feels more convenient to determine what's going on.
    
    However we can still mimic the current behavior with having infinite 
capacity, so we can be back to current behavior if we feel it makes more sense.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to