Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22138#discussion_r211119914
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -425,70 +381,36 @@ private[kafka010] object KafkaDataConsumer extends 
Logging {
       def acquire(
           topicPartition: TopicPartition,
           kafkaParams: ju.Map[String, Object],
    -      useCache: Boolean): KafkaDataConsumer = synchronized {
    -    val key = new CacheKey(topicPartition, kafkaParams)
    -    val existingInternalConsumer = cache.get(key)
    +      useCache: Boolean): KafkaDataConsumer = {
     
    -    lazy val newInternalConsumer = new 
InternalKafkaConsumer(topicPartition, kafkaParams)
    +    if (!useCache) {
    +      return NonCachedKafkaDataConsumer(new 
InternalKafkaConsumer(topicPartition, kafkaParams))
    +    }
     
    -    if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    -      // If this is reattempt at running the task, then invalidate cached 
consumer if any and
    -      // start with a new one.
    -      if (existingInternalConsumer != null) {
    -        // Consumer exists in cache. If its in use, mark it for closing 
later, or close it now.
    -        if (existingInternalConsumer.inUse) {
    -          existingInternalConsumer.markedForClose = true
    -        } else {
    -          existingInternalConsumer.close()
    -        }
    -      }
    -      cache.remove(key)  // Invalidate the cache in any case
    -      NonCachedKafkaDataConsumer(newInternalConsumer)
    +    val key = new CacheKey(topicPartition, kafkaParams)
     
    -    } else if (!useCache) {
    -      // If planner asks to not reuse consumers, then do not use it, 
return a new consumer
    -      NonCachedKafkaDataConsumer(newInternalConsumer)
    +    if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
    +      // If this is reattempt at running the task, then invalidate cached 
consumer if any.
     
    -    } else if (existingInternalConsumer == null) {
    -      // If consumer is not already cached, then put a new in the cache 
and return it
    -      cache.put(key, newInternalConsumer)
    -      newInternalConsumer.inUse = true
    -      CachedKafkaDataConsumer(newInternalConsumer)
    +      // invalidate all idle consumers for the key
    +      pool.invalidateKey(key)
     
    -    } else if (existingInternalConsumer.inUse) {
    -      // If consumer is already cached but is currently in use, then 
return a new consumer
    -      NonCachedKafkaDataConsumer(newInternalConsumer)
    +      // borrow a consumer from pool even in this case
    --- End diff --
    
    This is another behavior change: If this attempt succeeds, we can use 
pooled consumer from next batch, so no reason to discard the consumer.
    
    But I also see the cost of unnecessary pooling if failure occurs 
continuously.
    
    So that looks like kind of decision between possibility of success vs 
possibility of failure (again), and while I decide to cache it, it is pretty 
easy to go back to current behavior, so please let me know if we think current 
behavior makes more sense.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to