Github user ScrapCodes commented on a diff in the pull request:
https://github.com/apache/spark/pull/19096#discussion_r162022825
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
---
@@ -112,8 +112,8 @@ class KafkaContinuousDataWriter(
checkForErrors()
if (producer != null) {
producer.flush()
+ producer.inUseCount.decrementAndGet()
checkForErrors()
- CachedKafkaProducer.close(new java.util.HashMap[String,
Object](producerParams.asJava))
--- End diff --
Since a producer is shared across threads, we maintain inuse counts and
close them separately.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]