HeartSaVioR commented on a change in pull request #25582:
[SPARK-28875][DSTREAMS][SS][TESTS] Add Task retry tests to make sure new
consumer used
URL: https://github.com/apache/spark/pull/25582#discussion_r317642200
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala
##########
@@ -60,23 +70,40 @@ class KafkaDataConsumerSuite extends SharedSparkSession
with PrivateMethodTester
assert(e.getCause === cause)
}
+ test("new KafkaDataConsumer instance in case of Task retry") {
+ try {
+ KafkaDataConsumer.cache.clear()
+
+ val kafkaParams = getKafkaParams()
+ val key = new CacheKey(groupId, topicPartition)
+
+ val context1 = new TaskContextImpl(0, 0, 0, 0, 0, null, null, null)
+ TaskContext.setTaskContext(context1)
+ val consumer1 = KafkaDataConsumer.acquire(topicPartition, kafkaParams,
true)
+ consumer1.release()
+
+ assert(KafkaDataConsumer.cache.size() == 1)
+ assert(KafkaDataConsumer.cache.get(key).eq(consumer1.internalConsumer))
+
+ val context2 = new TaskContextImpl(0, 0, 0, 0, 1, null, null, null)
+ TaskContext.setTaskContext(context2)
+ val consumer2 = KafkaDataConsumer.acquire(topicPartition, kafkaParams,
true)
+ consumer2.release()
+
+ // The first consumer should be removed from cache and new non-cached
should be returned
Review comment:
I'd say consumer2 should be cached as it's created after invalidation, but
here you only address test so that's OK.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]