HeartSaVioR commented on a change in pull request #26470: [SPARK-27042][SS] 
Invalidate cached Kafka producer in case of task retry
URL: https://github.com/apache/spark/pull/26470#discussion_r346630445
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 ##########
 @@ -93,6 +93,10 @@ private[kafka010] object CachedKafkaProducer extends 
Logging {
         .setAuthenticationConfigIfNeeded()
         .build()
     val key = toCacheKey(updatedKafkaParams)
+    if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
 
 Review comment:
   > IIUC, this is trying to close a shared producer because of a random 
failure (including user code errors)? IMO, we should not do this for producer. 
It can be shared with multiple Spark jobs as long as they are using the same 
Kafka parameters.
   
   We changed the producer pool to follow the same approach as Kafka consumer - 
#25853 to resolve the long-standing "producer close on idle" issue. #19096 
proves closing idle producer while producer can be shard with multiple tasks is 
very complicated - we should consider adding "in-use" and then should deal with 
thread-safety. One task one producer is pretty much simpler, and we didn't 
observe performance issue here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to