The KafkaProducerPool instance is created in the driver. Right? What's I was saying is when a Spark job runs, it will serialize KafkaProducerPool and create a new instance in the executor side.
You can use the singleton pattern to make sure one JVM process has only one KafkaProducerPool instance. On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora <nipunarora2...@gmail.com> wrote: > It's a producer pool, the borrow object takes an existing kafka producer > object if it is free, or creates one if all are being used. > Shouldn't we re-use kafka producer objects for writing to Kafka. > > @ryan- can you suggest a good solution for writing a dstream to kafka > which can be used in production? > > I am attaching the Kafka producer pool class, where would one issue a call > to close(): > > public class KafkaProducerPool implements Serializable { > > private static final long serialVersionUID = -1913028296093224674L; > > private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> > pool; > > private ScheduledExecutorService executorService; > > private final Properties properties; > > private final int minIdle; > > /** > * Creates the pool. > * > * @param minIdle > * minimum number of objects residing in the pool > */ > public KafkaProducerPool(final int minIdle, final Properties properties) { > // initialize pool > this.properties = properties; > this.minIdle = minIdle; > initialize(); > > } > > /** > * Creates the pool. > * > * @param minIdle > * minimum number of objects residing in the pool > * @param maxIdle > * maximum number of objects residing in the pool > * @param validationInterval > * time in seconds for periodical checking of minIdle / maxIdle > * conditions in a separate thread. When the number of objects > is > * less than minIdle, missing instances will be created. When > the > * number of objects is greater than maxIdle, too many instances > * will be removed. > */ > public KafkaProducerPool(final int minIdle, final int maxIdle, > final long validationInterval, final Properties properties) { > // initialize pool > this.properties = properties; > this.minIdle = minIdle; > initialize(); > > // check pool conditions in a separate thread > executorService = Executors.newSingleThreadScheduledExecutor(); > executorService.scheduleWithFixedDelay(new Runnable() { > @Override > public void run() { > int size = pool.size(); > if (size < minIdle) { > int sizeToBeAdded = minIdle - size; > for (int i = 0; i < sizeToBeAdded; i++) { > pool.add(createProducer()); > } > } else if (size > maxIdle) { > int sizeToBeRemoved = size - maxIdle; > for (int i = 0; i < sizeToBeRemoved; i++) { > pool.poll(); > } > } > } > }, validationInterval, validationInterval, TimeUnit.SECONDS); > } > > /** > * Gets the next free object from the pool. If the pool doesn't contain any > * objects, a new object will be created and given to the caller of this > * method back. > * > * @return T borrowed object > */ > public synchronized KafkaProducer<String, String> borrowProducer() { > if (pool == null) > initialize(); > KafkaProducer<String, String> object; > if ((object = pool.poll()) == null) { > object = createProducer(); > } > > return object; > } > > /** > * Returns object back to the pool. > * > * object to be returned > */ > public void returnProducer(KafkaProducer<String, String> producer) { > if (producer == null) { > return; > } > this.pool.offer(producer); > } > > /** > * Shutdown this pool. > */ > public void shutdown() { > if (executorService != null) { > KafkaProducer<String, String> producer; > while ((producer = pool.poll()) != null) { > producer.close(); > } > executorService.shutdown(); > } > } > > /** > * Creates a new producer. > * > * @return T new object > */ > private KafkaProducer<String, String> createProducer() { > KafkaProducer<String, String> producer = new > KafkaProducer<String,String>(properties); > return producer; > } > > private void initialize() { > pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>(); > > for (int i = 0; i < minIdle; i++) { > pool.add(createProducer()); > } > } > > public void closeAll() { > KafkaProducer<String, String> object; > while ((object = pool.poll()) != null) { > //object.flush(); > object.close(); > } > } > } > > Thanks > Nipun > > On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Looks like you create KafkaProducerPool in the driver. So when the task >> is running in the executor, it will always see an new >> empty KafkaProducerPool and create KafkaProducers. But nobody closes these >> KafkaProducers. >> >> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2...@gmail.com> >> wrote: >> >> >> Sorry for not writing the patch number, it's spark 1.6.1. >> The relevant code is here inline. >> >> Please have a look and let me know if there is a resource leak. >> Please also let me know if you need any more details. >> >> Thanks >> Nipun >> >> >> The JavaRDDKafkaWriter code is here inline: >> >> import org.apache.spark.api.java.JavaRDD; >> import org.apache.spark.api.java.function.VoidFunction; >> import scala.Tuple2; >> >> import java.io.Serializable; >> import java.util.Iterator; >> >> public class JavaRDDStringKafkaWriter implements Serializable, >> VoidFunction<JavaRDD<String>> { >> >> private static final long serialVersionUID = -865193912367180261L; >> private final KafkaProducerPool pool; >> private final String topic; >> private final Boolean kafkaAsync; >> >> public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String >> topic, Boolean kafkaAsync) { >> this.pool = pool; >> this.topic = topic; >> this.kafkaAsync = kafkaAsync; >> } >> >> @Override >> public void call(JavaRDD<String> stringJavaRDD) throws Exception { >> stringJavaRDD.foreachPartition(new PartitionVoidFunction( >> new RDDKafkaWriter(pool,kafkaAsync), topic)); >> } >> >> private class PartitionVoidFunction implements >> VoidFunction<Iterator<String>> { >> >> private static final long serialVersionUID = 8726871215617446598L; >> private final RDDKafkaWriter kafkaWriter; >> private final String topic; >> >> public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) >> { >> this.kafkaWriter = kafkaWriter; >> this.topic = topic; >> } >> >> @Override >> public void call(Iterator<String> iterator) throws Exception { >> while (iterator.hasNext()) { >> kafkaWriter.writeToKafka(topic, iterator.next()); >> } >> } >> } >> } >> >> >> The RDDKafkaWriter is here: >> >> >> import java.io.Serializable; >> import java.util.concurrent.ExecutionException; >> >> import org.apache.kafka.clients.producer.KafkaProducer; >> import org.apache.kafka.clients.producer.ProducerRecord; >> >> import scala.Tuple2; >> >> public class RDDKafkaWriter implements Serializable { >> >> private static final long serialVersionUID = 7374381310562055607L; >> private final KafkaProducerPool pool; >> private final Boolean async; >> >> public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) { >> this.pool = pool; >> this.async = async; >> >> } >> >> public void writeToKafka(String topic, Tuple2<String, String> message) { >> KafkaProducer<String, String> producer = pool.borrowProducer(); >> ProducerRecord<String, String> record = new ProducerRecord<String, >> String>( >> topic, message._1(), message._2()); >> if (async) { >> producer.send(record); >> } else { >> try { >> producer.send(record).get(); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> } >> pool.returnProducer(producer); >> } >> >> public void writeToKafka(String topic, String message) { >> >> KafkaProducer<String, String> producer = pool.borrowProducer(); >> ProducerRecord<String, String> record = new ProducerRecord<String, >> String>(topic, message); >> >> if (async) { >> producer.send(record); >> } else { >> try { >> producer.send(record).get(); >> } catch (Exception e) { >> e.printStackTrace(); >> } >> } >> pool.returnProducer(producer); >> } >> >> >> } >> >> >> >> >> >> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >> Please also include the patch version, such as 1.6.0, 1.6.1. Could you >> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks >> resources. >> >> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2...@gmail.com> >> wrote: >> >> It is spark 1.6 >> >> Thanks >> Nipun >> >> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >> Could you provide your Spark version please? >> >> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2...@gmail.com> >> wrote: >> >> Hi, >> >> I get a resource leak, where the number of file descriptors in spark >> streaming keeps increasing. We end up with a "too many file open" error >> eventually through an exception caused in: >> >> JAVARDDKafkaWriter, which is writing a spark JavaDStream<String> >> >> The exception is attached inline. Any help will be greatly appreciated. >> >> Thanks >> Nipun >> >> ------------------------------------------- >> Time: 1485762530000 ms >> ------------------------------------------- >> >> Exception in thread "main" org.apache.spark.SparkException: Job aborted >> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent >> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost): >> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4- >> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 >> (too many open files) >> at java.io.FileOutputStream.open(Native Method) >> at java.io.FileOutputStream.<init>(FileOutputStream.java:221) >> at org.apache.spark.storage.DiskBlockObjectWriter.open( >> DiskBlockObjectWriter.scala:88) >> at org.apache.spark.storage.DiskBlockObjectWriter.write( >> DiskBlockObjectWriter.scala:181) >> at org.apache.spark.util.collection.WritablePartitionedPairCollect >> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) >> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile( >> ExternalSorter.scala:659) >> at org.apache.spark.shuffle.sort.SortShuffleWriter.write( >> SortShuffleWriter.scala:72) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:73) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1145) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$ >> scheduler$DAGScheduler$$failJobAndIndependentStages( >> DAGScheduler.scala:1431) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( >> DAGScheduler.scala:1419) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( >> DAGScheduler.scala:1418) >> at scala.collection.mutable.ResizableArray$class.foreach( >> ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at org.apache.spark.scheduler.DAGScheduler.abortStage( >> DAGScheduler.scala:1418) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$ >> handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >> at org.apache.spark.scheduler.DAGScheduler$$anonfun$ >> handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >> at scala.Option.foreach(Option.scala:236) >> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( >> DAGScheduler.scala:799) >> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. >> doOnReceive(DAGScheduler.scala:1640) >> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. >> onReceive(DAGScheduler.scala:1599) >> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. >> onReceive(DAGScheduler.scala:1588) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883) >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954) >> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. >> apply(RDD.scala:920) >> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1. >> apply(RDD.scala:918) >> at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:150) >> at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:111) >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) >> at org.apache.spark.api.java.JavaRDDLike$class. >> foreachPartition(JavaRDDLike.scala:225) >> at org.apache.spark.api.java.AbstractJavaRDDLike. >> foreachPartition(JavaRDDLike.scala:46) >> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call( >> JavaRDDStringKafkaWriter.java:25) >> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call( >> JavaRDDStringKafkaWriter.java:10) >> at org.apache.spark.streaming.api.java.JavaDStreamLike$$ >> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) >> at org.apache.spark.streaming.api.java.JavaDStreamLike$$ >> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335) >> at org.apache.spark.streaming.dstream.DStream$$anonfun$ >> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >> at org.apache.spark.streaming.dstream.DStream$$anonfun$ >> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661) >> at org.apache.spark.streaming.dstream.ForEachDStream$$ >> anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) >> at org.apache.spark.streaming.dstream.ForEachDStream$$ >> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >> at org.apache.spark.streaming.dstream.ForEachDStream$$ >> anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) >> at org.apache.spark.streaming.dstream.DStream. >> createRDDWithLocalProperties(DStream.scala:426) >> at org.apache.spark.streaming.dstream.ForEachDStream$$ >> anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) >> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply( >> ForEachDStream.scala:49) >> at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply( >> ForEachDStream.scala:49) >> at scala.util.Try$.apply(Try.scala:161) >> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) >> at org.apache.spark.streaming.scheduler.JobScheduler$ >> JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229) >> at org.apache.spark.streaming.scheduler.JobScheduler$ >> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229) >> at org.apache.spark.streaming.scheduler.JobScheduler$ >> JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at org.apache.spark.streaming.scheduler.JobScheduler$ >> JobHandler.run(JobScheduler.scala:228) >> at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1145) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:615) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4- >> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 >> (too many open files) >> at java.io.FileOutputStream.open(Native Method) >> at java.io.FileOutputStream.<init>(FileOutputStream.java:221) >> at org.apache.spark.storage.DiskBlockObjectWriter.open( >> DiskBlockObjectWriter.scala:88) >> at org.apache.spark.storage.DiskBlockObjectWriter.write( >> DiskBlockObjectWriter.scala:181) >> at org.apache.spark.util.collection.WritablePartitionedPairCollect >> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56) >> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile( >> ExternalSorter.scala:659) >> at org.apache.spark.shuffle.sort.SortShuffleWriter.write( >> SortShuffleWriter.scala:72) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:73) >> at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> ... 3 more >> >> >> >> >>