The KafkaProducerPool instance is created in the driver. Right? What's I
was saying is when a Spark job runs, it will serialize KafkaProducerPool
and create a new instance in the executor side.

You can use the singleton pattern to make sure one JVM process has only one
KafkaProducerPool instance.

On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora <nipunarora2...@gmail.com>
wrote:

> It's a producer pool, the borrow object takes an existing kafka producer
> object if it is free, or creates one if all are being used.
> Shouldn't we re-use kafka producer objects for writing to Kafka.
>
> @ryan- can you suggest a good solution for writing a dstream to kafka
> which can be used in production?
>
> I am attaching the Kafka producer pool class, where would one issue a call
> to close():
>
> public class KafkaProducerPool implements Serializable {
>
>    private static final long serialVersionUID = -1913028296093224674L;
>
>    private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> 
> pool;
>
>    private ScheduledExecutorService executorService;
>
>    private final Properties properties;
>
>    private final int minIdle;
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     */
>    public KafkaProducerPool(final int minIdle, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>    }
>
>    /**
>     * Creates the pool.
>     *
>     * @param minIdle
>     *            minimum number of objects residing in the pool
>     * @param maxIdle
>     *            maximum number of objects residing in the pool
>     * @param validationInterval
>     *            time in seconds for periodical checking of minIdle / maxIdle
>     *            conditions in a separate thread. When the number of objects 
> is
>     *            less than minIdle, missing instances will be created. When 
> the
>     *            number of objects is greater than maxIdle, too many instances
>     *            will be removed.
>     */
>    public KafkaProducerPool(final int minIdle, final int maxIdle,
>          final long validationInterval, final Properties properties) {
>       // initialize pool
>       this.properties = properties;
>       this.minIdle = minIdle;
>       initialize();
>
>       // check pool conditions in a separate thread
>       executorService = Executors.newSingleThreadScheduledExecutor();
>       executorService.scheduleWithFixedDelay(new Runnable() {
>          @Override
>          public void run() {
>             int size = pool.size();
>             if (size < minIdle) {
>                int sizeToBeAdded = minIdle - size;
>                for (int i = 0; i < sizeToBeAdded; i++) {
>                   pool.add(createProducer());
>                }
>             } else if (size > maxIdle) {
>                int sizeToBeRemoved = size - maxIdle;
>                for (int i = 0; i < sizeToBeRemoved; i++) {
>                   pool.poll();
>                }
>             }
>          }
>       }, validationInterval, validationInterval, TimeUnit.SECONDS);
>    }
>
>    /**
>     * Gets the next free object from the pool. If the pool doesn't contain any
>     * objects, a new object will be created and given to the caller of this
>     * method back.
>     *
>     * @return T borrowed object
>     */
>    public synchronized KafkaProducer<String, String> borrowProducer() {
>       if (pool == null)
>          initialize();
>       KafkaProducer<String, String> object;
>       if ((object = pool.poll()) == null) {
>          object = createProducer();
>       }
>
>       return object;
>    }
>
>    /**
>     * Returns object back to the pool.
>     *
>     *            object to be returned
>     */
>    public void returnProducer(KafkaProducer<String, String> producer) {
>       if (producer == null) {
>          return;
>       }
>       this.pool.offer(producer);
>    }
>
>    /**
>     * Shutdown this pool.
>     */
>    public void shutdown() {
>       if (executorService != null) {
>          KafkaProducer<String, String> producer;
>          while ((producer = pool.poll()) != null) {
>             producer.close();
>          }
>          executorService.shutdown();
>       }
>    }
>
>    /**
>     * Creates a new producer.
>     *
>     * @return T new object
>     */
>    private KafkaProducer<String, String> createProducer() {
>       KafkaProducer<String, String> producer = new 
> KafkaProducer<String,String>(properties);
>       return producer;
>    }
>
>    private void initialize() {
>       pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();
>
>       for (int i = 0; i < minIdle; i++) {
>          pool.add(createProducer());
>       }
>    }
>
>    public void closeAll() {
>       KafkaProducer<String, String> object;
>       while ((object = pool.poll()) != null) {
>          //object.flush();
>          object.close();
>       }
>    }
> }
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Looks like you create KafkaProducerPool in the driver. So when the task
>> is running in the executor, it will always see an new
>> empty KafkaProducerPool and create KafkaProducers. But nobody closes these
>> KafkaProducers.
>>
>> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>>
>> Sorry for not writing the patch number, it's spark 1.6.1.
>> The relevant code is here inline.
>>
>> Please have a look and let me know if there is a resource leak.
>> Please also let me know if you need any more details.
>>
>> Thanks
>> Nipun
>>
>>
>> The JavaRDDKafkaWriter code is here inline:
>>
>> import org.apache.spark.api.java.JavaRDD;
>> import org.apache.spark.api.java.function.VoidFunction;
>> import scala.Tuple2;
>>
>> import java.io.Serializable;
>> import java.util.Iterator;
>>
>> public class JavaRDDStringKafkaWriter implements Serializable, 
>> VoidFunction<JavaRDD<String>> {
>>
>>    private static final long serialVersionUID = -865193912367180261L;
>>    private final KafkaProducerPool pool;
>>    private final String topic;
>>    private final Boolean kafkaAsync;
>>
>>    public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String 
>> topic, Boolean kafkaAsync) {
>>       this.pool = pool;
>>       this.topic = topic;
>>       this.kafkaAsync = kafkaAsync;
>>    }
>>
>>    @Override
>>    public void call(JavaRDD<String> stringJavaRDD) throws Exception {
>>       stringJavaRDD.foreachPartition(new PartitionVoidFunction(
>>             new RDDKafkaWriter(pool,kafkaAsync), topic));
>>    }
>>
>>    private class PartitionVoidFunction implements
>>          VoidFunction<Iterator<String>> {
>>
>>       private static final long serialVersionUID = 8726871215617446598L;
>>       private final RDDKafkaWriter kafkaWriter;
>>       private final String topic;
>>
>>       public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) 
>> {
>>          this.kafkaWriter = kafkaWriter;
>>          this.topic = topic;
>>       }
>>
>>       @Override
>>       public void call(Iterator<String> iterator) throws Exception {
>>          while (iterator.hasNext()) {
>>             kafkaWriter.writeToKafka(topic, iterator.next());
>>          }
>>       }
>>    }
>> }
>>
>>
>> The RDDKafkaWriter is here:
>>
>>
>> import java.io.Serializable;
>> import java.util.concurrent.ExecutionException;
>>
>> import org.apache.kafka.clients.producer.KafkaProducer;
>> import org.apache.kafka.clients.producer.ProducerRecord;
>>
>> import scala.Tuple2;
>>
>> public class RDDKafkaWriter implements Serializable {
>>
>>    private static final long serialVersionUID = 7374381310562055607L;
>>    private final KafkaProducerPool pool;
>>    private final Boolean async;
>>
>>    public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>>       this.pool = pool;
>>       this.async = async;
>>
>>    }
>>
>>    public void writeToKafka(String topic, Tuple2<String, String> message) {
>>       KafkaProducer<String, String> producer = pool.borrowProducer();
>>       ProducerRecord<String, String> record = new ProducerRecord<String, 
>> String>(
>>             topic, message._1(), message._2());
>>       if (async) {
>>          producer.send(record);
>>       } else {
>>          try {
>>             producer.send(record).get();
>>          } catch (Exception e) {
>>             e.printStackTrace();
>>          }
>>       }
>>       pool.returnProducer(producer);
>>    }
>>
>>     public void writeToKafka(String topic, String message) {
>>
>>         KafkaProducer<String, String> producer = pool.borrowProducer();
>>         ProducerRecord<String, String> record = new ProducerRecord<String, 
>> String>(topic, message);
>>
>>         if (async) {
>>             producer.send(record);
>>         } else {
>>             try {
>>                 producer.send(record).get();
>>             } catch (Exception e) {
>>                 e.printStackTrace();
>>             }
>>         }
>>         pool.returnProducer(producer);
>>     }
>>
>>
>> }
>>
>>
>>
>>
>>
>> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
>> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
>> resources.
>>
>> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>> It is spark 1.6
>>
>> Thanks
>> Nipun
>>
>> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2...@gmail.com>
>> wrote:
>>
>> Hi,
>>
>> I get a resource leak, where the number of file descriptors in spark
>> streaming keeps increasing. We end up with a "too many file open" error
>> eventually through an exception caused in:
>>
>> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>
>>
>> The exception is attached inline. Any help will be greatly appreciated.
>>
>> Thanks
>> Nipun
>>
>> -------------------------------------------
>> Time: 1485762530000 ms
>> -------------------------------------------
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
>> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$failJobAndIndependentStages(
>> DAGScheduler.scala:1431)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1419)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1418)
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(
>> DAGScheduler.scala:1418)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> doOnReceive(DAGScheduler.scala:1640)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1599)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:920)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:918)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:150)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:111)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
>> at org.apache.spark.api.java.JavaRDDLike$class.
>> foreachPartition(JavaRDDLike.scala:225)
>> at org.apache.spark.api.java.AbstractJavaRDDLike.
>> foreachPartition(JavaRDDLike.scala:46)
>> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
>> JavaRDDStringKafkaWriter.java:25)
>> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
>> JavaRDDStringKafkaWriter.java:10)
>> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
>> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
>> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
>> at org.apache.spark.streaming.dstream.DStream$$anonfun$
>> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>> at org.apache.spark.streaming.dstream.DStream$$anonfun$
>> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>> at org.apache.spark.streaming.dstream.DStream.
>> createRDDWithLocalProperties(DStream.scala:426)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$
>> anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
>> ForEachDStream.scala:49)
>> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(
>> ForEachDStream.scala:49)
>> at scala.util.Try$.apply(Try.scala:161)
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at org.apache.spark.streaming.scheduler.JobScheduler$
>> JobHandler.run(JobScheduler.scala:228)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> ... 3 more
>>
>>
>>
>>
>>

Reply via email to