gaborgsomogyi commented on a change in pull request #25853: [SPARK-21869][SS]
Apply Apache Commons Pool to Kafka producer
URL: https://github.com/apache/spark/pull/25853#discussion_r343014522
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/CachedKafkaProducerSuite.scala
##########
@@ -17,61 +17,139 @@
package org.apache.spark.sql.kafka010
-import java.{util => ju}
-import java.util.concurrent.ConcurrentMap
+import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.kafka.clients.producer.KafkaProducer
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord,
RecordMetadata}
+import
org.apache.kafka.clients.producer.ProducerConfig.{KEY_SERIALIZER_CLASS_CONFIG,
VALUE_SERIALIZER_CLASS_CONFIG}
import org.apache.kafka.common.serialization.ByteArraySerializer
import org.scalatest.PrivateMethodTester
+import org.apache.spark.{TaskContext, TaskContextImpl}
+import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._
import org.apache.spark.sql.test.SharedSparkSession
class CachedKafkaProducerSuite extends SharedSparkSession with
PrivateMethodTester with KafkaTest {
- type KP = KafkaProducer[Array[Byte], Array[Byte]]
+ private var testUtils: KafkaTestUtils = _
+ private val topic = "topic" + Random.nextInt()
+ private var producerPool: InternalKafkaProducerPool = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils(Map[String, Object]())
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ }
+ super.afterAll()
+ }
- protected override def beforeEach(): Unit = {
+ override def beforeEach(): Unit = {
super.beforeEach()
- CachedKafkaProducer.clear()
+
+ producerPool = {
+ val internalKafkaConsumerPoolMethod =
PrivateMethod[InternalKafkaProducerPool]('producerPool)
+ CachedKafkaProducer.invokePrivate(internalKafkaConsumerPoolMethod())
+ }
+
+ producerPool.reset()
}
- test("Should return the cached instance on calling getOrCreate with same
params.") {
- val kafkaParams = new ju.HashMap[String, Object]()
- kafkaParams.put("acks", "0")
+ private def getKafkaParams(acks: Int = 0) = Map[String, Object](
+ "acks" -> acks.toString,
// Here only host should be resolvable, it does not need a running
instance of kafka server.
- kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
- kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
- kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
- val producer = CachedKafkaProducer.getOrCreate(kafkaParams)
- val producer2 = CachedKafkaProducer.getOrCreate(kafkaParams)
- assert(producer == producer2)
-
- val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)],
KP]](Symbol("getAsMap"))
- val map = CachedKafkaProducer.invokePrivate(cacheMap())
- assert(map.size == 1)
+ BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress,
+ KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName,
+ VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName
+ ).asJava
+
+ test("acquire should return the cached instance with same params") {
+ val kafkaParams = getKafkaParams()
+
+ val producer1 = CachedKafkaProducer.acquire(kafkaParams)
+ CachedKafkaProducer.release(producer1)
+ val producer2 = CachedKafkaProducer.acquire(kafkaParams)
+ CachedKafkaProducer.release(producer2)
+
+ assert(producer1 === producer2)
+ assert(producerPool.size(toCacheKey(kafkaParams)) === 1)
}
- test("Should close the correct kafka producer for the given kafkaPrams.") {
- val kafkaParams = new ju.HashMap[String, Object]()
- kafkaParams.put("acks", "0")
- kafkaParams.put("bootstrap.servers", "127.0.0.1:9022")
- kafkaParams.put("key.serializer", classOf[ByteArraySerializer].getName)
- kafkaParams.put("value.serializer", classOf[ByteArraySerializer].getName)
- val producer: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
- kafkaParams.put("acks", "1")
- val producer2: KP = CachedKafkaProducer.getOrCreate(kafkaParams)
- // With updated conf, a new producer instance should be created.
- assert(producer != producer2)
-
- val cacheMap = PrivateMethod[ConcurrentMap[Seq[(String, Object)],
KP]](Symbol("getAsMap"))
- val map = CachedKafkaProducer.invokePrivate(cacheMap())
- assert(map.size == 2)
-
- CachedKafkaProducer.close(kafkaParams)
- val map2 = CachedKafkaProducer.invokePrivate(cacheMap())
- assert(map2.size == 1)
- import scala.collection.JavaConverters._
- val (seq: Seq[(String, Object)], _producer: KP) =
map2.asScala.toArray.apply(0)
- assert(_producer == producer)
+ test("acquire should return a new instance with different params") {
+ val kafkaParams1 = getKafkaParams()
+ val kafkaParams2 = getKafkaParams(1)
+
+ val producer1 = CachedKafkaProducer.acquire(kafkaParams1)
+ CachedKafkaProducer.release(producer1)
+ val producer2 = CachedKafkaProducer.acquire(kafkaParams2)
+ CachedKafkaProducer.release(producer2)
+
+ assert(producer1 !== producer2)
+ assert(producerPool.size(toCacheKey(kafkaParams1)) === 1)
+ assert(producerPool.size(toCacheKey(kafkaParams2)) === 1)
+ }
+
+ test("Concurrent use of CachedKafkaProducer") {
+ val data = (1 to 1000).map(_.toString)
+ testUtils.createTopic(topic, 1)
+
+ val kafkaParams = getKafkaParams()
+ val numThreads = 100
+ val numProducerUsages = 500
+
+ @volatile var error: Throwable = null
+
+ val callback = new Callback() {
+ override def onCompletion(recordMetadata: RecordMetadata, e: Exception):
Unit = {
+ if (error == null && e != null) {
+ error = e
+ }
+ }
+ }
+
+ def produce(): Unit = {
+ val taskContext = if (Random.nextBoolean) {
+ new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2),
null, null, null)
+ } else {
+ null
+ }
+ TaskContext.setTaskContext(taskContext)
+ val producer = CachedKafkaProducer.acquire(kafkaParams)
+ try {
+ data.foreach { d =>
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, 0,
null, d.getBytes)
+ producer.send(record, callback)
+ }
+ } catch {
+ case e: Throwable =>
+ if (error == null) {
Review comment:
Yeah, couple of lines can be spared. Removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]