Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117804270
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 ---
    @@ -18,34 +18,58 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
     import javax.annotation.concurrent.GuardedBy
     
    +import com.google.common.cache._
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
     import scala.collection.immutable.SortedMap
     import scala.collection.mutable
    -
    -import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.util.control.NonFatal
     
     import org.apache.spark.internal.Logging
     
     private[kafka010] object CachedKafkaProducer extends Logging {
     
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
    -  @GuardedBy("this")
    -  private val cacheMap = new mutable.HashMap[String, Producer]()
    +  private val cacheExpireTimeout: Long = 
System.getProperty("spark.kafka.guava.cache.timeout",
    +    "10").toLong
    +
    +  private val noneReturningLoader = new CacheLoader[String, 
Option[Producer]] {
    +    override def load(key: String): Option[Producer] = {
    +      None
    +    }
    +  }
    +
    +  private val removalListener = new RemovalListener[String, 
Option[Producer]]() {
    +    override def onRemoval(notification: RemovalNotification[String, 
Option[Producer]]): Unit = {
    +      val uid: String = notification.getKey
    +      val producer = notification.getValue
    +      log.debug(s"Evicting kafka producer $producer uid:$uid, due to 
${notification.getCause}")
    --- End diff --
    
    space after `:`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to