Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/19789#discussion_r152056775 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -155,11 +178,11 @@ object CachedKafkaConsumer extends Logging { logInfo(s"Cache miss for $k") logDebug(cache.keySet.toString) val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) - cache.put(k, c) + cache.put(k, c.acquireAndGet()) c } else { // any given topicpartition should have a consistent key and value type - v.asInstanceOf[CachedKafkaConsumer[K, V]] + v.acquireAndGet().asInstanceOf[CachedKafkaConsumer[K, V]] --- End diff -- Shouldn't this method call be after the cast?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org