I'm glad that I could help :)
19 sie 2015 8:52 AM "Shenghua(Daniel) Wan" <wansheng...@gmail.com>
napisał(a):

> +1
>
> I wish I have read this blog earlier. I am using Java and have just
> implemented a singleton producer per executor/JVM during the day.
> Yes, I did see that NonSerializableException when I was debugging the code
> ...
>
> Thanks for sharing.
>
> On Tue, Aug 18, 2015 at 10:59 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Its a cool blog post! Tweeted it!
>> Broadcasting the configuration necessary for lazily instantiating the
>> producer is a good idea.
>>
>> Nitpick: The first code example has an extra `}` ;)
>>
>> On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan <marcin.kut...@gmail.com>
>> wrote:
>>
>>> As long as Kafka producent is thread-safe you don't need any pool at
>>> all. Just share single producer on every executor. Please look at my blog
>>> post for more details. http://allegro.tech/spark-kafka-integration.html
>>> 19 sie 2015 2:00 AM "Shenghua(Daniel) Wan" <wansheng...@gmail.com>
>>> napisał(a):
>>>
>>>> All of you are right.
>>>>
>>>> I was trying to create too many producers. My idea was to create a
>>>> pool(for now the pool contains only one producer) shared by all the
>>>> executors.
>>>> After I realized it was related to the serializable issues (though I
>>>> did not find clear clues in the source code to indicate the broacast
>>>> template type parameter must be implement serializable),  I followed spark
>>>> cassandra connector design and created a singleton of Kafka producer pools.
>>>> There is not exception noticed.
>>>>
>>>> Thanks for all your comments.
>>>>
>>>>
>>>> On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> Why are you even trying to broadcast a producer? A broadcast variable
>>>>> is some immutable piece of serializable DATA that can be used for
>>>>> processing on the executors. A Kafka producer is neither DATA nor
>>>>> immutable, and definitely not serializable.
>>>>> The right way to do this is to create the producer in the executors.
>>>>> Please see the discussion in the programming guide
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>>>>>
>>>>> On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> I wouldn't expect a kafka producer to be serializable at all... among
>>>>>> other things, it has a background thread
>>>>>>
>>>>>> On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan <
>>>>>> wansheng...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Did anyone see java.util.ConcurrentModificationException when using
>>>>>>> broadcast variables?
>>>>>>> I encountered this exception when wrapping a Kafka producer like
>>>>>>> this in the spark streaming driver.
>>>>>>>
>>>>>>> Here is what I did.
>>>>>>> KafkaProducer<String, String> producer = new KafkaProducer<String,
>>>>>>> String>(properties);
>>>>>>> final Broadcast<KafkaDataProducer> bCastProducer
>>>>>>>     = streamingContext.sparkContext().broadcast(producer);
>>>>>>>
>>>>>>> Then within an closure called by a foreachRDD, I was trying to get
>>>>>>> the wrapped producer, i.e.
>>>>>>>  KafkaProducer<String, String> p = bCastProducer.value();
>>>>>>>
>>>>>>> after rebuilding and rerunning, I got the stack trace like this
>>>>>>>
>>>>>>> Exception in thread "main" com.esotericsoftware.kryo.KryoException:
>>>>>>> java.util.ConcurrentModificationException
>>>>>>> Serialization trace:
>>>>>>> classes (sun.misc.Launcher$AppClassLoader)
>>>>>>> classloader (java.security.ProtectionDomain)
>>>>>>> context (java.security.AccessControlContext)
>>>>>>> acc (org.apache.spark.util.MutableURLClassLoader)
>>>>>>> contextClassLoader (org.apache.kafka.common.utils.KafkaThread)
>>>>>>> ioThread (org.apache.kafka.clients.producer.KafkaProducer)
>>>>>>> producer ("my driver")
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>>>>>>> at
>>>>>>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>>>> at
>>>>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>>>> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289)
>>>>>>> at
>>>>>>> org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648)
>>>>>>> at "my driver"
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>>>>>> at
>>>>>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>>>>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>>>>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>>>>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>>>>> Caused by: java.util.ConcurrentModificationException
>>>>>>> at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
>>>>>>> at java.util.Vector$Itr.next(Vector.java:1133)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)
>>>>>>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>>>>>>> at
>>>>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>>>>>>> ... 41 more
>>>>>>>
>>>>>>> ​Thanks.​
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Regards,
>>>>>>> Shenghua (Daniel) Wan
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Regards,
>>>> Shenghua (Daniel) Wan
>>>>
>>>
>>
>
>
> --
>
> Regards,
> Shenghua (Daniel) Wan
>

Reply via email to