Github user ScrapCodes commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19096#discussion_r162022825
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
 ---
    @@ -112,8 +112,8 @@ class KafkaContinuousDataWriter(
         checkForErrors()
         if (producer != null) {
           producer.flush()
    +      producer.inUseCount.decrementAndGet()
           checkForErrors()
    -      CachedKafkaProducer.close(new java.util.HashMap[String, 
Object](producerParams.asJava))
    --- End diff --
    
    Since a producer is shared across threads, we maintain inuse counts and 
close them separately.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to