Try implementing IRichBolt so you have direct control over emits and acks, and also synchronize access to the OutputCollector so that multiple threads aren't using it simultaneously.
On Mon, Mar 10, 2014 at 3:52 AM, 鞠大升 <[email protected]> wrote: > hi, all > > *Background:* > > --------------------------------------------------------------------------------------------------------------------- > we are using Storm 0.9.0.1, our topology has KafkaSpout(read logs from > kafka), ParserBolt(paser log), SaverBolt(save to kafka again). KafkaSpout > have 16 threads, ParserBolt have 32 threads, SaverBolt have 16 threads. > The ParserBolt is written in python using Multilang. > > *Problems:* > > --------------------------------------------------------------------------------------------------------------------- > Sometimes, KryoTupleSerializer.serialize throws NullPointerException cause > the ParserBolt died。Then the supervisor will restart the bolts again, but > the new bolt will never receive any tuples, and the topology is hang until > we restart the topology. > > *Analyse:* > > --------------------------------------------------------------------------------------------------------------------- > We found a TroubleShooting( > https://github.com/nathanmarz/storm/wiki/Troubleshooting#wiki-nullpointerexception-from-deep-inside-storm) > says: This > is caused by having multiple threads issue methods on the OutputCollector. > All emits, acks, and fails must happen on the same thread. One subtle way > this can happen is if you make a IBasicBolt that emits on a separate > thread. IBasicBolt's automatically ack after execute is called, so this > would cause multiple threads to use the OutputCollector leading to this > exception. When using a basic bolt, all emits must happen in the same > thread that runs execute. > And we found in ShellBolt.java,the _readerThread is a new thread, > handleEmit will call emit to emit new tuples. > > But another wiki( > https://github.com/nathanmarz/storm/wiki/Concepts#wiki-bolts) says: Its > perfectly fine to launch new threads in bolts that do processing > asynchronously. > OutputCollector<http://nathanmarz.github.com/storm/doc/backtype/storm/task/OutputCollector.html> > is > thread-safe and can be called at any time. > > *So we have questions:* > > --------------------------------------------------------------------------------------------------------------------- > 1) does OutputCollector is thread-safe or not? if it is not thread-safe, > then all emits, acks, and fails must happen on the same thread, does > ShellBolt has a bug? > 2) when the bolt is restart, why the topology is hang? by the way, we are > using netty. > > anyone can help? > > the work.log: > > --------------------------------------------------------------------------------------------------------------------- > > 2014-03-10 12:48:31 b.s.util [ERROR] Async loop died! > > java.lang.RuntimeException: java.lang.NullPointerException > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:90) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:61) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.disruptor$consume_loop_STAR_$fn__849.invoke(disruptor.clj:74) > ~[storm-core-0.9.0.1.jar:na] > > at backtype.storm.util$async_loop$fn__469.invoke(util.clj:406) > ~[storm-core-0.9.0.1.jar:na] > > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na] > > at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21] > > Caused by: java.lang.NullPointerException: null > > at > backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:24) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.daemon.worker$mk_transfer_fn$fn__4335$fn__4339.invoke(worker.clj:108) > ~[storm-core-0.9.0.1.jar:na] > > at backtype.storm.util$fast_list_map.invoke(util.clj:804) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.daemon.worker$mk_transfer_fn$fn__4335.invoke(worker.clj:108) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__4060.invoke(executor.clj:240) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.disruptor$clojure_handler$reify__836.onEvent(disruptor.clj:43) > ~[storm-core-0.9.0.1.jar:na] > > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) > ~[storm-core-0.9.0.1.jar:na] > > ... 6 common frames omitted > > > > -- > dashengju > +86 13810875910 > [email protected] >
