Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20767#discussion_r174984237
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -467,44 +435,58 @@ private[kafka010] object KafkaDataConsumer extends 
Logging {
           // If this is reattempt at running the task, then invalidate cached 
consumer if any and
           // start with a new one.
           if (existingInternalConsumer != null) {
    -        if (existingInternalConsumer.inuse) {
    -          // Consumer exists in cache and is somehow in use. Don't close 
it immediately, but
    -          // mark it for being closed when it is released.
    +        // Consumer exists in cache. If its in use, mark it for closing 
later, or close it now.
    +        if (existingInternalConsumer.inUse) {
               existingInternalConsumer.markedForClose = true
    -          NonCachedKafkaDataConsumer(newInternalConsumer)
    -
             } else {
    -          // Consumer exists in cache and is not in use, so close it 
immediately and replace
    -          // it with a new one.
               existingInternalConsumer.close()
    -          cache.put(key, newInternalConsumer)
    -          CachedKafkaDataConsumer(newInternalConsumer)
    -
             }
    -      } else {
    -        // Consumer is not cached, put the new one in the cache
    -        cache.put(key, newInternalConsumer)
    -        CachedKafkaDataConsumer(newInternalConsumer)
    -
           }
    +      cache.remove(key)  // Invalidate the cache in any case
    +      NonCachedKafkaDataConsumer(newInternalConsumer)
    +
         } else if (!useCache) {
           // If planner asks to not reuse consumers, then do not use it, 
return a new consumer
           NonCachedKafkaDataConsumer(newInternalConsumer)
     
         } else if (existingInternalConsumer == null) {
           // If consumer is not already cached, then put a new in the cache 
and return it
    -      newInternalConsumer.inuse = true
           cache.put(key, newInternalConsumer)
    +      newInternalConsumer.inUse = true
           CachedKafkaDataConsumer(newInternalConsumer)
     
    -    } else if (existingInternalConsumer.inuse) {
    +    } else if (existingInternalConsumer.inUse) {
           // If consumer is already cached but is currently in use, then 
return a new consumer
           NonCachedKafkaDataConsumer(newInternalConsumer)
    --- End diff --
    
    Maybe keep an internal counter for how many times the non cached consumer 
is created.
    This would give us information on how effective the cache is


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to