Github user tedyu commented on a diff in the pull request:
https://github.com/apache/spark/pull/20767#discussion_r174984237
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -467,44 +435,58 @@ private[kafka010] object KafkaDataConsumer extends
Logging {
// If this is reattempt at running the task, then invalidate cached
consumer if any and
// start with a new one.
if (existingInternalConsumer != null) {
- if (existingInternalConsumer.inuse) {
- // Consumer exists in cache and is somehow in use. Don't close
it immediately, but
- // mark it for being closed when it is released.
+ // Consumer exists in cache. If its in use, mark it for closing
later, or close it now.
+ if (existingInternalConsumer.inUse) {
existingInternalConsumer.markedForClose = true
- NonCachedKafkaDataConsumer(newInternalConsumer)
-
} else {
- // Consumer exists in cache and is not in use, so close it
immediately and replace
- // it with a new one.
existingInternalConsumer.close()
- cache.put(key, newInternalConsumer)
- CachedKafkaDataConsumer(newInternalConsumer)
-
}
- } else {
- // Consumer is not cached, put the new one in the cache
- cache.put(key, newInternalConsumer)
- CachedKafkaDataConsumer(newInternalConsumer)
-
}
+ cache.remove(key) // Invalidate the cache in any case
+ NonCachedKafkaDataConsumer(newInternalConsumer)
+
} else if (!useCache) {
// If planner asks to not reuse consumers, then do not use it,
return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)
} else if (existingInternalConsumer == null) {
// If consumer is not already cached, then put a new in the cache
and return it
- newInternalConsumer.inuse = true
cache.put(key, newInternalConsumer)
+ newInternalConsumer.inUse = true
CachedKafkaDataConsumer(newInternalConsumer)
- } else if (existingInternalConsumer.inuse) {
+ } else if (existingInternalConsumer.inUse) {
// If consumer is already cached but is currently in use, then
return a new consumer
NonCachedKafkaDataConsumer(newInternalConsumer)
--- End diff --
Maybe keep an internal counter for how many times the non cached consumer
is created.
This would give us information on how effective the cache is
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]