Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/22138#discussion_r211119914
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -425,70 +381,36 @@ private[kafka010] object KafkaDataConsumer extends
Logging {
def acquire(
topicPartition: TopicPartition,
kafkaParams: ju.Map[String, Object],
- useCache: Boolean): KafkaDataConsumer = synchronized {
- val key = new CacheKey(topicPartition, kafkaParams)
- val existingInternalConsumer = cache.get(key)
+ useCache: Boolean): KafkaDataConsumer = {
- lazy val newInternalConsumer = new
InternalKafkaConsumer(topicPartition, kafkaParams)
+ if (!useCache) {
+ return NonCachedKafkaDataConsumer(new
InternalKafkaConsumer(topicPartition, kafkaParams))
+ }
- if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
- // If this is reattempt at running the task, then invalidate cached
consumer if any and
- // start with a new one.
- if (existingInternalConsumer != null) {
- // Consumer exists in cache. If its in use, mark it for closing
later, or close it now.
- if (existingInternalConsumer.inUse) {
- existingInternalConsumer.markedForClose = true
- } else {
- existingInternalConsumer.close()
- }
- }
- cache.remove(key) // Invalidate the cache in any case
- NonCachedKafkaDataConsumer(newInternalConsumer)
+ val key = new CacheKey(topicPartition, kafkaParams)
- } else if (!useCache) {
- // If planner asks to not reuse consumers, then do not use it,
return a new consumer
- NonCachedKafkaDataConsumer(newInternalConsumer)
+ if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
+ // If this is reattempt at running the task, then invalidate cached
consumer if any.
- } else if (existingInternalConsumer == null) {
- // If consumer is not already cached, then put a new in the cache
and return it
- cache.put(key, newInternalConsumer)
- newInternalConsumer.inUse = true
- CachedKafkaDataConsumer(newInternalConsumer)
+ // invalidate all idle consumers for the key
+ pool.invalidateKey(key)
- } else if (existingInternalConsumer.inUse) {
- // If consumer is already cached but is currently in use, then
return a new consumer
- NonCachedKafkaDataConsumer(newInternalConsumer)
+ // borrow a consumer from pool even in this case
--- End diff --
This is another behavior change: If this attempt succeeds, we can use
pooled consumer from next batch, so no reason to discard the consumer.
But I also see the cost of unnecessary pooling if failure occurs
continuously.
So that looks like kind of decision between possibility of success vs
possibility of failure (again), and while I decide to cache it, it is pretty
easy to go back to current behavior, so please let me know if we think current
behavior makes more sense.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]