Github user daroo commented on a diff in the pull request:
https://github.com/apache/spark/pull/19789#discussion_r152060617
--- Diff:
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
---
@@ -155,11 +178,11 @@ object CachedKafkaConsumer extends Logging {
logInfo(s"Cache miss for $k")
logDebug(cache.keySet.toString)
val c = new CachedKafkaConsumer[K, V](groupId, topic, partition,
kafkaParams)
- cache.put(k, c)
+ cache.put(k, c.acquireAndGet())
c
} else {
// any given topicpartition should have a consistent key and value
type
- v.asInstanceOf[CachedKafkaConsumer[K, V]]
+ v.acquireAndGet().asInstanceOf[CachedKafkaConsumer[K, V]]
--- End diff --
I can be, but actually it doesn't matter as the cache value has
CachedKafkaConsumer[_, _] type
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]