As long as Kafka producent is thread-safe you don't need any pool at all.
Just share single producer on every executor. Please look at my blog post
for more details. http://allegro.tech/spark-kafka-integration.html
19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wansheng...@gmail.com>
napisał(a):

> All of you are right.
>
> I was trying to create too many producers. My idea was to create a
> pool(for now the pool contains only one producer) shared by all the
> executors.
> After I realized it was related to the serializable issues (though I did
> not find clear clues in the source code to indicate the broacast template
> type parameter must be implement serializable),  I followed spark cassandra
> connector design and created a singleton of Kafka producer pools. There is
> not exception noticed.
>
> Thanks for all your comments.
>
>
> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Why are you even trying to broadcast a producer? A broadcast variable is
>> some immutable piece of serializable DATA that can be used for processing
>> on the executors. A Kafka producer is neither DATA nor immutable, and
>> definitely not serializable.
>> The right way to do this is to create the producer in the executors.
>> Please see the discussion in the programming guide
>>
>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>
>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> I wouldn't expect a kafka producer to be serializable at all... among
>>> other things, it has a background thread
>>>
>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>>> wansheng...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> Did anyone see java.util.ConcurrentModificationException when using
>>>> broadcast variables?
>>>> I encountered this exception when wrapping a Kafka producer like this
>>>> in the spark streaming driver.
>>>>
>>>> Here is what I did.
>>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>>> String>(properties);
>>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>>     = streamingContext.sparkContext().broadcast(producer);
>>>>
>>>> Then within an closure called by a foreachRDD, I was trying to get the
>>>> wrapped producer, i.e.
>>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>>
>>>> after rebuilding and rerunning, I got the stack trace like this
>>>>
>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>>> java.util.ConcurrentModificationException
>>>> Serialization trace:
>>>> classes (sun.misc.Launcher$AppClassLoader)
>>>> classloader (java.security.ProtectionDomain)
>>>> context (java.security.AccessControlContext)
>>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>>> producer ("my driver")
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>> at
>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>>> at
>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>> at
>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>>> at
>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>>> at "my driver"
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>>> at
>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>> Caused by: java.util.ConcurrentModificationException
>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>> at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>> ... 41 more
>>>>
>>>> ​Thanks.​
>>>>
>>>> --
>>>>
>>>> Regards,
>>>> Shenghua (Daniel) Wan
>>>>
>>>
>>>
>>
>
>
> --
>
> Regards,
> Shenghua (Daniel) Wan
>

Reply via email to