Hello,

I tried to configure storm to indicate that it should use the kryo
FieldsSerializer with the following configuration in the "strom.yml" file:

topology.kryo.register:
  - com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString

This is as described here:
https://github.com/nathanmarz/storm/wiki/Serialization
However, this still did not solve the issue. The exception still remained.

Also, there is another issue if I want to write my own custom serializer.
The issue is that this class "
com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString" is a private
inner class. It was public before but in the latest versions of the library
this is private. So, I might not be able to write a custom serializer as
well.

Any pointers as to how to resolve this issue?

Thanks
Vinay


On Sun, Mar 23, 2014 at 8:26 AM, Vinay Pothnis <[email protected]>wrote:

> Thank you Samit,
>
> Will try out the kryo field serializer first.
>
> -Vinay
>
>
> On Sun, Mar 23, 2014 at 12:43 AM, Samit Sasan <[email protected]>wrote:
>
>> Hi Vinay,
>>
>> Like Srinnath said its because of something you are sending in a tuple, 
>> *com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
>> to be exact.*
>>
>> *Storm uses kryo internally for serialization as its faster than vanilla
>> java serialization and storm gives you an *
>> *option for registering classes to kryo.*
>>
>> Config conf = new Config();
>>
>> ....
>>
>> conf.registerSerialization(
>> *com.rabbitmq.client.impl.LongStringHelper.ByteArrayLongString*.class);
>>
>> ....
>>
>> submitTopology_(topologyName, conf, topology.build());
>>
>>
>> For most cases this is sufficient as kryo will then use field serializer.
>> If this doesn't work, that means there is something in there which would
>> need some other serializer (read custom serializer), for which have a look
>> at kryo docs which are pretty good.
>>
>>
>> -Samit
>>
>>
>> On Sat, Mar 22, 2014 at 9:42 AM, Vinay Pothnis 
>> <[email protected]>wrote:
>>
>>> Thank you Srinath,
>>>
>>> Will check that out. Wondering if I need to write a custom serializer.
>>>
>>> Thanks!
>>> Vinay
>>> On Mar 21, 2014 5:34 PM, "Srinath C" <[email protected]> wrote:
>>>
>>>> Vinay,
>>>>     The exception is raised from *KryoTupleSerializer*, so one of the
>>>> values in your tuple directly or indirectly reference instances of
>>>> *ByteArrayLongString.* This is a class from the RabbitMQ client
>>>> library. One of the possibilities could be that you are adding all client
>>>> properties<http://grepcode.com/file/repo1.maven.org/maven2/com.rabbitmq/amqp-client/3.2.1/com/rabbitmq/client/impl/AMQConnection.java#75>into
>>>>  the tuple.
>>>>
>>>> Regards,
>>>> Srinath.
>>>>
>>>>
>>>>
>>>> On Sat, Mar 22, 2014 at 4:56 AM, Vinay Pothnis <[email protected]
>>>> > wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> This seems to happen when the spout and the bolt are on different JVMs
>>>>> (worker process) - which is understandable as there would be some sort of
>>>>> serialization for the inter jvm communication.
>>>>>
>>>>> But we are not able to find out what data this is and the logs do not
>>>>> say much. Have turned on "topology.debug = true" - but the logs dont
>>>>> contain any other useful information.
>>>>>
>>>>> Any idea how to troubleshoot this?
>>>>>
>>>>> Thanks!
>>>>> Vinay
>>>>>
>>>>>
>>>>> On Fri, Mar 21, 2014 at 3:07 PM, Vinay Pothnis <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a storm cluster of 3 nodes and a topology with 3 workers. I
>>>>>> have rabbit-mq from which I read messages and also post back to a queue.
>>>>>>
>>>>>> I seem to get this exception when I am using more than 1 worker. When
>>>>>> I have just 1 worker, I do not get this exception. But when i change it 
>>>>>> to
>>>>>> 3 workers, this exception arises.
>>>>>>
>>>>>> *java.lang.RuntimeException: java.lang.RuntimeException:
>>>>>> java.io.NotSerializableException:
>>>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString*
>>>>>> *  at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)*
>>>>>> *  at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)*
>>>>>> *  at
>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)*
>>>>>> *  at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__2975.invoke(disruptor.clj:74)*
>>>>>> *  at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)*
>>>>>> *  at clojure.lang.AFn.run(AFn.java:24)*
>>>>>> *  at java.lang.Thread.run(Thread.java:744)*
>>>>>> *Caused by: java.lang.RuntimeException:
>>>>>> java.io.NotSerializableException:
>>>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString*
>>>>>> *  at
>>>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)*
>>>>>> *  at
>>>>>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)*
>>>>>> *  at
>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)*
>>>>>> *  at
>>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)*
>>>>>> *  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)*
>>>>>> *  at
>>>>>> backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)*
>>>>>> *  at
>>>>>> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)*
>>>>>> *  at
>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686$fn__5690.invoke(worker.clj:108)*
>>>>>> *  at backtype.storm.util$fast_list_map.invoke(util.clj:801)*
>>>>>> *  at
>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686.invoke(worker.clj:108)*
>>>>>> *  at
>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3328.invoke(executor.clj:240)*
>>>>>> *  at
>>>>>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)*
>>>>>> *  at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)*
>>>>>> *  ... 6 more*
>>>>>> *Caused by: java.io.NotSerializableException:
>>>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)*
>>>>>> *  at java.util.HashMap.writeObject(HashMap.java:1133)*
>>>>>> *  at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)*
>>>>>> *  at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>>>>> *  at java.lang.reflect.Method.invoke(Method.java:606)*
>>>>>> *  at
>>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)*
>>>>>> *  at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)*
>>>>>> *  at
>>>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)*
>>>>>>
>>>>>>
>>>>>> Any pointers on what could be the reason and how to troubleshoot
>>>>>> this?
>>>>>>
>>>>>> Thanks!
>>>>>> Vinay
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Reply via email to