Are you using the OutputCollector to emit in a separate thread (i.e. Outside of 
the execute() method.)?

As the wiki states, this will cause the problem you are seeing?

-Taylor

> On May 21, 2014, at 7:24 AM, Irek Khasyanov <[email protected]> wrote:
> 
> Hello.
> 
> I have strange problem with by topology, sometimes everything crashed with 
> exception:
> 
> java.lang.RuntimeException: java.lang.NullPointerException
>       at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:107)
>       at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:78)
>       at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:77)
>       at 
> backtype.storm.disruptor$consume_loop_STAR_$fn__1577.invoke(disruptor.clj:89)
>       at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
>       at clojure.lang.AFn.run(AFn.java:24)
>       at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NullPointerException
>       at 
> backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:41)
>       at 
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4217$fn__4221.invoke(worker.clj:123)
>       at backtype.storm.util$fast_list_map.invoke(util.clj:832)
>       at 
> backtype.storm.daemon.worker$mk_transfer_fn$fn__4217.invoke(worker.clj:123)
>       at 
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3746.invoke(executor.clj:255)
>       at 
> backtype.storm.disruptor$clojure_handler$reify__1560.onEvent(disruptor.clj:58)
>       at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:104)
>       ... 6 more
> 
> First of all, I'm using storm 0.9.1, kafka 0.8.1, storm-kafka-0.8-plus 0.4.0, 
> zookeeper 3.3.6, oracle java 1.6 
> 
> I have toplogy: kafka spout -> database bolt -> database bolt. Second 
> database bolt commits to another table, with data emmited from first bolt.
> 
> I saw wiki page 
> https://github.com/nathanmarz/storm/wiki/Troubleshooting#nullpointerexception-from-deep-inside-storm
>  but I can't say I'm doing everything wrong. I'm ack'ing right after 
> execute() call, for example:
> 
> @Override
>     public void execute(Tuple tuple) {
>         this.collector.ack(tuple);
>         this.queue.offer(tuple);
>     }
> 
> 
> 
> And then queue in different thread will be commited to database. 
> 
> Everything works well for different time, topology can crash after few 
> minutes or after 10 hours. I did't see anything wrong in ZK, kafka logs.
> 
> What can be a problem? Where I should look into?
> 
> -- 
> With best regards, Irek Khasyanov.

Reply via email to