Hi Vinay,

Like Srinnath said its because of something you are sending in a
tuple, *com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
to be exact.*

*Storm uses kryo internally for serialization as its faster than vanilla
java serialization and storm gives you an *
*option for registering classes to kryo.*

Config conf = new Config();

....

conf.registerSerialization(
*com.rabbitmq.client.impl.LongStringHelper.ByteArrayLongString*.class);

....

submitTopology_(topologyName, conf, topology.build());


For most cases this is sufficient as kryo will then use field serializer.
If this doesn't work, that means there is something in there which would
need some other serializer (read custom serializer), for which have a look
at kryo docs which are pretty good.


-Samit


On Sat, Mar 22, 2014 at 9:42 AM, Vinay Pothnis <[email protected]>wrote:

> Thank you Srinath,
>
> Will check that out. Wondering if I need to write a custom serializer.
>
> Thanks!
> Vinay
> On Mar 21, 2014 5:34 PM, "Srinath C" <[email protected]> wrote:
>
>> Vinay,
>>     The exception is raised from *KryoTupleSerializer*, so one of the
>> values in your tuple directly or indirectly reference instances of
>> *ByteArrayLongString.* This is a class from the RabbitMQ client library.
>> One of the possibilities could be that you are adding all client
>> properties<http://grepcode.com/file/repo1.maven.org/maven2/com.rabbitmq/amqp-client/3.2.1/com/rabbitmq/client/impl/AMQConnection.java#75>into
>>  the tuple.
>>
>> Regards,
>> Srinath.
>>
>>
>>
>> On Sat, Mar 22, 2014 at 4:56 AM, Vinay Pothnis 
>> <[email protected]>wrote:
>>
>>> Hello,
>>>
>>> This seems to happen when the spout and the bolt are on different JVMs
>>> (worker process) - which is understandable as there would be some sort of
>>> serialization for the inter jvm communication.
>>>
>>> But we are not able to find out what data this is and the logs do not
>>> say much. Have turned on "topology.debug = true" - but the logs dont
>>> contain any other useful information.
>>>
>>> Any idea how to troubleshoot this?
>>>
>>> Thanks!
>>> Vinay
>>>
>>>
>>> On Fri, Mar 21, 2014 at 3:07 PM, Vinay Pothnis 
>>> <[email protected]>wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a storm cluster of 3 nodes and a topology with 3 workers. I have
>>>> rabbit-mq from which I read messages and also post back to a queue.
>>>>
>>>> I seem to get this exception when I am using more than 1 worker. When I
>>>> have just 1 worker, I do not get this exception. But when i change it to 3
>>>> workers, this exception arises.
>>>>
>>>> *java.lang.RuntimeException: java.lang.RuntimeException:
>>>> java.io.NotSerializableException:
>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString*
>>>> *  at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)*
>>>> *  at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)*
>>>> *  at
>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)*
>>>> *  at
>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__2975.invoke(disruptor.clj:74)*
>>>> *  at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)*
>>>> *  at clojure.lang.AFn.run(AFn.java:24)*
>>>> *  at java.lang.Thread.run(Thread.java:744)*
>>>> *Caused by: java.lang.RuntimeException:
>>>> java.io.NotSerializableException:
>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString*
>>>> *  at
>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)*
>>>> *  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)*
>>>> *  at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)*
>>>> *  at
>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)*
>>>> *  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)*
>>>> *  at
>>>> backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)*
>>>> *  at
>>>> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)*
>>>> *  at
>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686$fn__5690.invoke(worker.clj:108)*
>>>> *  at backtype.storm.util$fast_list_map.invoke(util.clj:801)*
>>>> *  at
>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686.invoke(worker.clj:108)*
>>>> *  at
>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3328.invoke(executor.clj:240)*
>>>> *  at
>>>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)*
>>>> *  at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)*
>>>> *  ... 6 more*
>>>> *Caused by: java.io.NotSerializableException:
>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)*
>>>> *  at java.util.HashMap.writeObject(HashMap.java:1133)*
>>>> *  at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>>>> *  at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>>> *  at java.lang.reflect.Method.invoke(Method.java:606)*
>>>> *  at
>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)*
>>>> *  at
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)*
>>>> *  at
>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)*
>>>> *  at
>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)*
>>>>
>>>>
>>>> Any pointers on what could be the reason and how to troubleshoot this?
>>>>
>>>> Thanks!
>>>> Vinay
>>>>
>>>>
>>>>
>>>
>>

Reply via email to