gaborgsomogyi commented on a change in pull request #19096: [SPARK-21869][SS] A 
cached Kafka producer should not be closed if any task is using it - adds inuse 
tracking.
URL: https://github.com/apache/spark/pull/19096#discussion_r267323800
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
 ##########
 @@ -18,60 +18,143 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.concurrent.ConcurrentMap
 
-import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.serialization.ByteArraySerializer
-import org.scalatest.PrivateMethodTester
+import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.util.Utils
 
-class CachedKafkaProducerSuite extends SharedSQLContext with 
PrivateMethodTester with KafkaTest {
-
-  type KP = KafkaProducer[Array[Byte], Array[Byte]]
+class CachedKafkaProducerSuite extends SharedSQLContext with KafkaTest {
 
   protected override def beforeEach(): Unit = {
     super.beforeEach()
     CachedKafkaProducer.clear()
   }
 
-  test("Should return the cached instance on calling getOrCreate with same 
params.") {
-    val kafkaParams = new ju.HashMap[String, Object]()
-    kafkaParams.put("acks", "0")
-    // Here only host should be resolvable, it does not need a running 
instance of kafka server.
-    kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
-    kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
-    kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
-    val producer = CachedKafkaProducer.getOrCreate(kafkaParams)
-    val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams)
-    assert(producer == producer2)
-
-    val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], 
KP]]('getAsMap)
-    val map = CachedKafkaProducer.invokePrivate(cacheMap())
+  test("Should return the cached instance on calling acquire with same 
params.") {
+    val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
+    val producer = CachedKafkaProducer.acquire(kafkaParams)
+    val producer2 = CachedKafkaProducer.acquire(kafkaParams)
+    assert(producer.kafkaProducer == producer2.kafkaProducer)
+    assert(producer.getInUseCount == 2)
+    val map = CachedKafkaProducer.getAsMap
     assert(map.size == 1)
   }
 
-  test("Should close the correct kafka producer for the given kafkaPrams.") {
+  test("Should return the new instance on calling acquire with different 
params.") {
+    val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
+    val producer = CachedKafkaProducer.acquire(kafkaParams)
+    kafkaParams.remove("ack") // mutate the kafka params.
+    val producer2 = CachedKafkaProducer.acquire(kafkaParams)
+    assert(producer.kafkaProducer != producer2.kafkaProducer)
+    assert(producer.getInUseCount == 1)
+    assert(producer2.getInUseCount == 1)
+    val map = CachedKafkaProducer.getAsMap
+    assert(map.size == 2)
+  }
+
+  test("Automatically remove a failing kafka producer from cache.") {
+    import testImplicits._
+    val df = Seq[(String, String)](null.asInstanceOf[String] -> 
"1").toDF("topic", "value")
+    val ex = intercept[SparkException] {
+      // This will fail because the service is not reachable.
+      df.write
+        .format("kafka")
+        .option("topic", "topic")
+        .option("kafka.retries", "1")
+        .option("kafka.max.block.ms", "2")
+        .option("kafka.bootstrap.servers", "12.0.0.1:39022")
+        .save()
+    }
+    
assert(ex.getMessage.contains("org.apache.kafka.common.errors.TimeoutException"),
+      "Spark command should fail due to service not reachable.")
+    // Since failing kafka producer is released on error and also invalidated, 
it should not be in
+    // cache.
+    val map = CachedKafkaProducer.getAsMap
+    assert(map.size == 0)
+  }
+
+  test("Should not close a producer in-use.") {
+    val kafkaParams: ju.HashMap[String, Object] = generateKafkaParams
+    val producer: CachedKafkaProducer = 
CachedKafkaProducer.acquire(kafkaParams)
+    producer.kafkaProducer // initializing the producer.
+    assert(producer.getInUseCount == 1)
+    // Explicitly cause the producer from guava cache to be evicted.
+    CachedKafkaProducer.evict(producer.getKafkaParams)
+    assert(producer.getInUseCount == 1)
+    assert(!producer.isClosed, "An in-use producer should not be closed.")
+  }
+
+  private def generateKafkaParams: ju.HashMap[String, Object] = {
     val kafkaParams = new ju.HashMap[String, Object]()
-    kafkaParams.put("acks", "0")
+    kafkaParams.put("ack", "0")
     kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
     kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
     kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
-    val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
-    kafkaParams.put("acks", "1")
-    val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
-    // With updated conf, a new producer instance should be created.
-    assert(producer != producer2)
-
-    val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)], 
KP]]('getAsMap)
-    val map = CachedKafkaProducer.invokePrivate(cacheMap())
-    assert(map.size == 2)
+    kafkaParams
+  }
+}
+
+class CachedKafkaProducerStressSuite extends KafkaContinuousTest with 
KafkaTest {
+
+  override val streamingTimeout = 30.seconds
 
 Review comment:
   * Seems like this timeout has no effect. At least when I've caused deadlock 
artificially then test never stopped.
   * The other concern I have is the consumed time. Spending 30 seconds for 
this is huge. What TD has written for the consumer part 
[here](https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDataConsumerSuite.scala#L63)
 takes about 7 seconds and provides more or less the same coverage. Maybe 
similar approach can be used.
   * It would be good to test failures as well like 
`CachedKafkaProducer.release(producer, true)`. This would be easy with the 
referenced other test approach (though with this one also solvable).
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to