zsxwing commented on a change in pull request #26470: [SPARK-27042][SS]
Invalidate cached Kafka producer in case of task retry
URL: https://github.com/apache/spark/pull/26470#discussion_r346566250
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
##########
@@ -93,6 +93,10 @@ private[kafka010] object CachedKafkaProducer extends
Logging {
.setAuthenticationConfigIfNeeded()
.build()
val key = toCacheKey(updatedKafkaParams)
+ if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
Review comment:
IIUC, this is trying to close a shared producer because of a random failure
(including user code errors)? IMO, we should not do this for producer. It can
be shared with multiple Spark jobs as long as they are using the same Kafka
parameters. Kafka actually suggests to share the producer in their doc:
> The producer is thread safe and sharing a single producer instance across
threads will generally be faster than having multiple instances.
Hence I would assume it can self-heal. Kafka consumer is a different story.
It's not thread-safe and cannot be shared with multiple tasks at the same time.
That's why we can close the old one since we are pretty sure it's not used by
another task in the same JVM.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]