Hello, I tried to configure storm to indicate that it should use the kryo FieldsSerializer with the following configuration in the "strom.yml" file:
topology.kryo.register: - com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString This is as described here: https://github.com/nathanmarz/storm/wiki/Serialization However, this still did not solve the issue. The exception still remained. Also, there is another issue if I want to write my own custom serializer. The issue is that this class " com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString" is a private inner class. It was public before but in the latest versions of the library this is private. So, I might not be able to write a custom serializer as well. Any pointers as to how to resolve this issue? Thanks Vinay On Sun, Mar 23, 2014 at 8:26 AM, Vinay Pothnis <[email protected]>wrote: > Thank you Samit, > > Will try out the kryo field serializer first. > > -Vinay > > > On Sun, Mar 23, 2014 at 12:43 AM, Samit Sasan <[email protected]>wrote: > >> Hi Vinay, >> >> Like Srinnath said its because of something you are sending in a tuple, >> *com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString >> to be exact.* >> >> *Storm uses kryo internally for serialization as its faster than vanilla >> java serialization and storm gives you an * >> *option for registering classes to kryo.* >> >> Config conf = new Config(); >> >> .... >> >> conf.registerSerialization( >> *com.rabbitmq.client.impl.LongStringHelper.ByteArrayLongString*.class); >> >> .... >> >> submitTopology_(topologyName, conf, topology.build()); >> >> >> For most cases this is sufficient as kryo will then use field serializer. >> If this doesn't work, that means there is something in there which would >> need some other serializer (read custom serializer), for which have a look >> at kryo docs which are pretty good. >> >> >> -Samit >> >> >> On Sat, Mar 22, 2014 at 9:42 AM, Vinay Pothnis >> <[email protected]>wrote: >> >>> Thank you Srinath, >>> >>> Will check that out. Wondering if I need to write a custom serializer. >>> >>> Thanks! >>> Vinay >>> On Mar 21, 2014 5:34 PM, "Srinath C" <[email protected]> wrote: >>> >>>> Vinay, >>>> The exception is raised from *KryoTupleSerializer*, so one of the >>>> values in your tuple directly or indirectly reference instances of >>>> *ByteArrayLongString.* This is a class from the RabbitMQ client >>>> library. One of the possibilities could be that you are adding all client >>>> properties<http://grepcode.com/file/repo1.maven.org/maven2/com.rabbitmq/amqp-client/3.2.1/com/rabbitmq/client/impl/AMQConnection.java#75>into >>>> the tuple. >>>> >>>> Regards, >>>> Srinath. >>>> >>>> >>>> >>>> On Sat, Mar 22, 2014 at 4:56 AM, Vinay Pothnis <[email protected] >>>> > wrote: >>>> >>>>> Hello, >>>>> >>>>> This seems to happen when the spout and the bolt are on different JVMs >>>>> (worker process) - which is understandable as there would be some sort of >>>>> serialization for the inter jvm communication. >>>>> >>>>> But we are not able to find out what data this is and the logs do not >>>>> say much. Have turned on "topology.debug = true" - but the logs dont >>>>> contain any other useful information. >>>>> >>>>> Any idea how to troubleshoot this? >>>>> >>>>> Thanks! >>>>> Vinay >>>>> >>>>> >>>>> On Fri, Mar 21, 2014 at 3:07 PM, Vinay Pothnis < >>>>> [email protected]> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> I have a storm cluster of 3 nodes and a topology with 3 workers. I >>>>>> have rabbit-mq from which I read messages and also post back to a queue. >>>>>> >>>>>> I seem to get this exception when I am using more than 1 worker. When >>>>>> I have just 1 worker, I do not get this exception. But when i change it >>>>>> to >>>>>> 3 workers, this exception arises. >>>>>> >>>>>> *java.lang.RuntimeException: java.lang.RuntimeException: >>>>>> java.io.NotSerializableException: >>>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString* >>>>>> * at >>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)* >>>>>> * at >>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)* >>>>>> * at >>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)* >>>>>> * at >>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__2975.invoke(disruptor.clj:74)* >>>>>> * at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)* >>>>>> * at clojure.lang.AFn.run(AFn.java:24)* >>>>>> * at java.lang.Thread.run(Thread.java:744)* >>>>>> *Caused by: java.lang.RuntimeException: >>>>>> java.io.NotSerializableException: >>>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString* >>>>>> * at >>>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)* >>>>>> * at >>>>>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)* >>>>>> * at >>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)* >>>>>> * at >>>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)* >>>>>> * at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)* >>>>>> * at >>>>>> backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)* >>>>>> * at >>>>>> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)* >>>>>> * at >>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686$fn__5690.invoke(worker.clj:108)* >>>>>> * at backtype.storm.util$fast_list_map.invoke(util.clj:801)* >>>>>> * at >>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686.invoke(worker.clj:108)* >>>>>> * at >>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3328.invoke(executor.clj:240)* >>>>>> * at >>>>>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)* >>>>>> * at >>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)* >>>>>> * ... 6 more* >>>>>> *Caused by: java.io.NotSerializableException: >>>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)* >>>>>> * at java.util.HashMap.writeObject(HashMap.java:1133)* >>>>>> * at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)* >>>>>> * at >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >>>>>> * at java.lang.reflect.Method.invoke(Method.java:606)* >>>>>> * at >>>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)* >>>>>> * at >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)* >>>>>> * at >>>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)* >>>>>> >>>>>> >>>>>> Any pointers on what could be the reason and how to troubleshoot >>>>>> this? >>>>>> >>>>>> Thanks! >>>>>> Vinay >>>>>> >>>>>> >>>>>> >>>>> >>>> >> >
