Nathan, it works well for longer time than before. About 2m tuples recieved and 20m tuples emitted, when I see exception again. I will try to change code and remove synchronized block and see what happens.
Is it possible what kafka 0.8.1 has some bugs and I should upgrade to 0.8.1.1 ? Or bugs only with my code? On 21 May 2014 20:41, Nathan Leung <[email protected]> wrote: > You can also synchronize access to the OutputCollector so that only 1 > thread is using it at a time. > > > On Wed, May 21, 2014 at 12:39 PM, Irek Khasyanov <[email protected]> wrote: > >> Hm, yes, firs bolt emitting from different thread, I did't realize that >> this will be problem. Thanks! I'll try to change everything and see errors. >> >> >> >> On 21 May 2014 18:01, P. Taylor Goetz <[email protected]> wrote: >> >>> Are you using the OutputCollector to emit in a separate thread (i.e. >>> Outside of the execute() method.)? >>> >>> As the wiki states, this will cause the problem you are seeing? >>> >>> -Taylor >>> >>> On May 21, 2014, at 7:24 AM, Irek Khasyanov <[email protected]> wrote: >>> >>> Hello. >>> >>> I have strange problem with by topology, sometimes everything crashed >>> with exception: >>> >>> java.lang.RuntimeException: java.lang.NullPointerException >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107) >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78) >>> at >>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77) >>> at >>> backtype.storm.disruptor$consume_loop_STAR_$fn__1577.invoke(disruptor.clj:89) >>> at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) >>> at clojure.lang.AFn.run(AFn.java:24) >>> at java.lang.Thread.run(Thread.java:662) >>> Caused by: java.lang.NullPointerException >>> at >>> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:41) >>> at >>> backtype.storm.daemon.worker$mk_transfer_fn$fn__4217$fn__4221.invoke(worker.clj:123) >>> at backtype.storm.util$fast_list_map.invoke(util.clj:832) >>> at >>> backtype.storm.daemon.worker$mk_transfer_fn$fn__4217.invoke(worker.clj:123) >>> at >>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3746.invoke(executor.clj:255) >>> at >>> backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58) >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104) >>> ... 6 more >>> >>> First of all, I'm using storm 0.9.1, kafka 0.8.1, storm-kafka-0.8-plus >>> 0.4.0, zookeeper 3.3.6, oracle java 1.6 >>> >>> I have toplogy: kafka spout -> database bolt -> database bolt. Second >>> database bolt commits to another table, with data emmited from first bolt. >>> >>> I saw wiki page >>> https://github.com/nathanmarz/storm/wiki/Troubleshooting#nullpointerexception-from-deep-inside-stormbut >>> I can't say I'm doing everything wrong. I'm ack'ing right after >>> execute() call, for example: >>> >>> @Override >>> public void execute(Tuple tuple) { >>> this.collector.ack(tuple); >>> this.queue.offer(tuple); >>> } >>> >>> >>> >>> And then queue in different thread will be commited to database. >>> >>> Everything works well for different time, topology can crash after few >>> minutes or after 10 hours. I did't see anything wrong in ZK, kafka logs. >>> >>> What can be a problem? Where I should look into? >>> >>> -- >>> With best regards, Irek Khasyanov. >>> >>> >> >> >> -- >> With best regards, Irek Khasyanov. >> > > -- With best regards, Irek Khasyanov.
