Thank you Samit, Will try out the kryo field serializer first.
-Vinay On Sun, Mar 23, 2014 at 12:43 AM, Samit Sasan <[email protected]> wrote: > Hi Vinay, > > Like Srinnath said its because of something you are sending in a tuple, > *com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString > to be exact.* > > *Storm uses kryo internally for serialization as its faster than vanilla > java serialization and storm gives you an * > *option for registering classes to kryo.* > > Config conf = new Config(); > > .... > > conf.registerSerialization( > *com.rabbitmq.client.impl.LongStringHelper.ByteArrayLongString*.class); > > .... > > submitTopology_(topologyName, conf, topology.build()); > > > For most cases this is sufficient as kryo will then use field serializer. > If this doesn't work, that means there is something in there which would > need some other serializer (read custom serializer), for which have a look > at kryo docs which are pretty good. > > > -Samit > > > On Sat, Mar 22, 2014 at 9:42 AM, Vinay Pothnis <[email protected]>wrote: > >> Thank you Srinath, >> >> Will check that out. Wondering if I need to write a custom serializer. >> >> Thanks! >> Vinay >> On Mar 21, 2014 5:34 PM, "Srinath C" <[email protected]> wrote: >> >>> Vinay, >>> The exception is raised from *KryoTupleSerializer*, so one of the >>> values in your tuple directly or indirectly reference instances of >>> *ByteArrayLongString.* This is a class from the RabbitMQ client >>> library. One of the possibilities could be that you are adding all client >>> properties<http://grepcode.com/file/repo1.maven.org/maven2/com.rabbitmq/amqp-client/3.2.1/com/rabbitmq/client/impl/AMQConnection.java#75>into >>> the tuple. >>> >>> Regards, >>> Srinath. >>> >>> >>> >>> On Sat, Mar 22, 2014 at 4:56 AM, Vinay Pothnis >>> <[email protected]>wrote: >>> >>>> Hello, >>>> >>>> This seems to happen when the spout and the bolt are on different JVMs >>>> (worker process) - which is understandable as there would be some sort of >>>> serialization for the inter jvm communication. >>>> >>>> But we are not able to find out what data this is and the logs do not >>>> say much. Have turned on "topology.debug = true" - but the logs dont >>>> contain any other useful information. >>>> >>>> Any idea how to troubleshoot this? >>>> >>>> Thanks! >>>> Vinay >>>> >>>> >>>> On Fri, Mar 21, 2014 at 3:07 PM, Vinay Pothnis <[email protected] >>>> > wrote: >>>> >>>>> Hello, >>>>> >>>>> I have a storm cluster of 3 nodes and a topology with 3 workers. I >>>>> have rabbit-mq from which I read messages and also post back to a queue. >>>>> >>>>> I seem to get this exception when I am using more than 1 worker. When >>>>> I have just 1 worker, I do not get this exception. But when i change it to >>>>> 3 workers, this exception arises. >>>>> >>>>> *java.lang.RuntimeException: java.lang.RuntimeException: >>>>> java.io.NotSerializableException: >>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString* >>>>> * at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90)* >>>>> * at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61)* >>>>> * at >>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)* >>>>> * at >>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__2975.invoke(disruptor.clj:74)* >>>>> * at backtype.storm.util$async_loop$fn__444.invoke(util.clj:403)* >>>>> * at clojure.lang.AFn.run(AFn.java:24)* >>>>> * at java.lang.Thread.run(Thread.java:744)* >>>>> *Caused by: java.lang.RuntimeException: >>>>> java.io.NotSerializableException: >>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString* >>>>> * at >>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:24)* >>>>> * at >>>>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:554)* >>>>> * at >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:77)* >>>>> * at >>>>> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)* >>>>> * at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:472)* >>>>> * at >>>>> backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:27)* >>>>> * at >>>>> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:27)* >>>>> * at >>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686$fn__5690.invoke(worker.clj:108)* >>>>> * at backtype.storm.util$fast_list_map.invoke(util.clj:801)* >>>>> * at >>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__5686.invoke(worker.clj:108)* >>>>> * at >>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3328.invoke(executor.clj:240)* >>>>> * at >>>>> backtype.storm.disruptor$clojure_handler$reify__2962.onEvent(disruptor.clj:43)* >>>>> * at >>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)* >>>>> * ... 6 more* >>>>> *Caused by: java.io.NotSerializableException: >>>>> com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString* >>>>> * at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)* >>>>> * at java.util.HashMap.writeObject(HashMap.java:1133)* >>>>> * at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)* >>>>> * at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)* >>>>> * at java.lang.reflect.Method.invoke(Method.java:606)* >>>>> * at >>>>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)* >>>>> * at >>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)* >>>>> * at >>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)* >>>>> * at >>>>> backtype.storm.serialization.SerializableSerializer.write(SerializableSerializer.java:21)* >>>>> >>>>> >>>>> Any pointers on what could be the reason and how to troubleshoot this? >>>>> >>>>> Thanks! >>>>> Vinay >>>>> >>>>> >>>>> >>>> >>> >
