Hello,

We are working with storm 1.0.2 and using Kafka client to subscribe to kafka 
topics to retrieve data.  Randomly when our topologies are starting up we 
receive a null pointer exception which is killing the topology.  We can't seem 
to identify why this happens, or what measures we can take to prevent it.  I'm 
considering raising a ticket with storm to check for null to prevent the 
topology from crashing.

2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) 
~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
 ~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) 
~[stormjar.jar:?]
    at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
 ~[storm-core-1.0.2.jar:1.0.2]
    ... 7 more
2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.NullPointerException
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) 
~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
 ~[storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) 
~[stormjar.jar:?]
    at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) 
~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
 ~[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
 ~[storm-core-1.0.2.jar:1.0.2]
    ... 7 more
2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
    at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765) 
[storm-core-1.0.2.jar:1.0.2]
    at 
org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274)
 [storm-core-1.0.2.jar:1.0.2]
    at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) 
[storm-core-1.0.2.jar:1.0.2]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]

The method and line number in question below:

@Override
    public void ack(Object messageId) {
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
        if (!consumerAutoCommitMode) {  // Only need to keep track of acked 
tuples if commits are not done automatically
            acked.get(msgId.getTopicPartition()).add(msgId);
        }
        emitted.remove(msgId);
}

Restarting the topology sometimes fixes the issue but sometimes does not.  I 
don't know if there's some kind of race condition occurring.  We have about 15 
topologies all connecting to kafka the same way, and this doesn't happen 
consistently between them.

Does anyone have any insight as to why this occurs or how to prevent it on our 
end?


Thanks,

Nick Cuneo  /  Software Engineer, Cloud  /  Enterprise Software
Tel: +1 949 517 4802  /  Mobile: +1 949 243 4952
3 Ada  /  Irvine, CA 92618  /  USA
[email protected]  /  www.tyco.com
[cid:[email protected]]

This email (including any attachments) may contain information that is private 
or business confidential. If you received this email in error, please delete it 
from your system without copying it and notify sender by reply email so that 
our records can be corrected


________________________________

This e-mail contains privileged and confidential information intended for the 
use of the addressees named above. If you are not the intended recipient of 
this e-mail, you are hereby notified that you must not disseminate, copy or 
take any action in respect of any information contained in it. If you have 
received this e-mail in error, please notify the sender immediately by e-mail 
and immediately destroy this e-mail and its attachments.

Reply via email to