Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17308#discussion_r117803947
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
 ---
    @@ -18,34 +18,58 @@
     package org.apache.spark.sql.kafka010
     
     import java.{util => ju}
    +import java.util.concurrent.{ConcurrentMap, TimeUnit}
     import javax.annotation.concurrent.GuardedBy
     
    +import com.google.common.cache._
    +import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.collection.JavaConverters._
     import scala.collection.immutable.SortedMap
     import scala.collection.mutable
    -
    -import org.apache.kafka.clients.producer.KafkaProducer
    +import scala.util.control.NonFatal
     
     import org.apache.spark.internal.Logging
     
     private[kafka010] object CachedKafkaProducer extends Logging {
     
       private type Producer = KafkaProducer[Array[Byte], Array[Byte]]
     
    -  @GuardedBy("this")
    -  private val cacheMap = new mutable.HashMap[String, Producer]()
    +  private val cacheExpireTimeout: Long = 
System.getProperty("spark.kafka.guava.cache.timeout",
    --- End diff --
    
    I wonder if this should be an option of the sink? I may be running multiple 
streams in a single cluster with different trigger intervals which would 
require different configs. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to