gaborgsomogyi commented on a change in pull request #25853: [SPARK-21869][SS]
Apply Apache Commons Pool to Kafka producer
URL: https://github.com/apache/spark/pull/25853#discussion_r327472910
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
##########
@@ -18,111 +18,97 @@
package org.apache.spark.sql.kafka010
import java.{util => ju}
-import java.util.concurrent.{ConcurrentMap, ExecutionException, TimeUnit}
+import java.io.Closeable
+import java.util.concurrent.ExecutionException
-import com.google.common.cache._
-import com.google.common.util.concurrent.{ExecutionError,
UncheckedExecutionException}
-import org.apache.kafka.clients.producer.KafkaProducer
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
+import com.google.common.util.concurrent.{ExecutionError,
UncheckedExecutionException}
+import org.apache.kafka.clients.producer.{Callback, KafkaProducer,
ProducerRecord}
+
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.{KafkaConfigUpdater, KafkaRedactionUtil}
+import org.apache.spark.sql.kafka010.InternalKafkaProducerPool._
+import org.apache.spark.util.ShutdownHookManager
-private[kafka010] object CachedKafkaProducer extends Logging {
+private[kafka010] class CachedKafkaProducer(val kafkaParams: ju.Map[String,
Object])
+ extends Closeable with Logging {
private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
- private val defaultCacheExpireTimeout = TimeUnit.MINUTES.toMillis(10)
-
- private lazy val cacheExpireTimeout: Long = Option(SparkEnv.get)
- .map(_.conf.get(PRODUCER_CACHE_TIMEOUT))
- .getOrElse(defaultCacheExpireTimeout)
+ private val producer = createProducer()
- private val cacheLoader = new CacheLoader[Seq[(String, Object)], Producer] {
- override def load(config: Seq[(String, Object)]): Producer = {
- createKafkaProducer(config)
+ private def createProducer(): Producer = {
+ val producer: Producer = new Producer(kafkaParams)
+ if (log.isDebugEnabled()) {
+ val redactedParamsSeq =
KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
+ logDebug(s"Created a new instance of kafka producer for
$redactedParamsSeq.")
}
+ producer
}
- private val removalListener = new RemovalListener[Seq[(String, Object)],
Producer]() {
- override def onRemoval(
- notification: RemovalNotification[Seq[(String, Object)], Producer]):
Unit = {
- val paramsSeq: Seq[(String, Object)] = notification.getKey
- val producer: Producer = notification.getValue
- if (log.isDebugEnabled()) {
- val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
- logDebug(s"Evicting kafka producer $producer params:
$redactedParamsSeq, " +
- s"due to ${notification.getCause}")
+ override def close(): Unit = {
+ try {
+ if (log.isInfoEnabled()) {
+ val redactedParamsSeq =
KafkaRedactionUtil.redactParams(toCacheKey(kafkaParams))
+ logInfo(s"Closing the KafkaProducer with params:
${redactedParamsSeq.mkString("\n")}.")
}
- close(paramsSeq, producer)
+ producer.close()
+ } catch {
+ case NonFatal(e) => logWarning("Error while closing kafka producer.", e)
}
}
- private lazy val guavaCache: LoadingCache[Seq[(String, Object)], Producer] =
- CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeout,
TimeUnit.MILLISECONDS)
- .removalListener(removalListener)
- .build[Seq[(String, Object)], Producer](cacheLoader)
+ def send(record: ProducerRecord[Array[Byte], Array[Byte]], callback:
Callback): Unit = {
+ producer.send(record, callback)
+ }
- private def createKafkaProducer(paramsSeq: Seq[(String, Object)]): Producer
= {
- val kafkaProducer: Producer = new Producer(paramsSeq.toMap.asJava)
- if (log.isDebugEnabled()) {
- val redactedParamsSeq = KafkaRedactionUtil.redactParams(paramsSeq)
- logDebug(s"Created a new instance of KafkaProducer for
$redactedParamsSeq.")
+ def flush(): Unit = {
+ producer.flush()
+ }
+}
+
+private[kafka010] object CachedKafkaProducer extends Logging {
+
+ private val sparkConf = SparkEnv.get.conf
+ private val producerPool = new InternalKafkaProducerPool(sparkConf)
+
+ ShutdownHookManager.addShutdownHook { () =>
+ try {
+ producerPool.close()
+ } catch {
+ case e: Throwable =>
+ logWarning("Ignoring exception while shutting down pool from shutdown
hook", e)
}
- kafkaProducer
}
/**
* Get a cached KafkaProducer for a given configuration. If matching
KafkaProducer doesn't
* exist, a new KafkaProducer will be created. KafkaProducer is thread safe,
it is best to keep
* one instance per specified kafkaParams.
*/
- private[kafka010] def getOrCreate(kafkaParams: ju.Map[String, Object]):
Producer = {
- val updatedKafkaProducerConfiguration =
+ def acquire(kafkaParams: ju.Map[String, Object]): CachedKafkaProducer = {
+ val updatedKafkaParams =
KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()
- val paramsSeq: Seq[(String, Object)] =
paramsToSeq(updatedKafkaProducerConfiguration)
+ val key = toCacheKey(updatedKafkaParams)
try {
- guavaCache.get(paramsSeq)
+ producerPool.borrowObject(key, updatedKafkaParams)
} catch {
case e @ (_: ExecutionException | _: UncheckedExecutionException | _:
ExecutionError)
Review comment:
Removed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]