Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19789#discussion_r152056775
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
    @@ -155,11 +178,11 @@ object CachedKafkaConsumer extends Logging {
             logInfo(s"Cache miss for $k")
             logDebug(cache.keySet.toString)
             val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, 
kafkaParams)
    -        cache.put(k, c)
    +        cache.put(k, c.acquireAndGet())
             c
           } else {
             // any given topicpartition should have a consistent key and value 
type
    -        v.asInstanceOf[CachedKafkaConsumer[K, V]]
    +        v.acquireAndGet().asInstanceOf[CachedKafkaConsumer[K, V]]
    --- End diff --
    
    Shouldn't this method call be after the cast?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to