Hello,
We are working with storm 1.0.2 and using Kafka client to subscribe to kafka
topics to retrieve data. Randomly when our topologies are starting up we
receive a null pointer exception which is killing the topology. We can't seem
to identify why this happens, or what measures we can take to prevent it. I'm
considering raising a ticket with storm to check for null to prevent the
topology from crashing.
2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
~[stormjar.jar:?]
at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
~[storm-core-1.0.2.jar:1.0.2]
... 7 more
2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
~[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.NullPointerException
at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316)
~[stormjar.jar:?]
at org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
~[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
~[storm-core-1.0.2.jar:1.0.2]
... 7 more
2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765)
[storm-core-1.0.2.jar:1.0.2]
at
org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274)
[storm-core-1.0.2.jar:1.0.2]
at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494)
[storm-core-1.0.2.jar:1.0.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
The method and line number in question below:
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!consumerAutoCommitMode) { // Only need to keep track of acked
tuples if commits are not done automatically
acked.get(msgId.getTopicPartition()).add(msgId);
}
emitted.remove(msgId);
}
Restarting the topology sometimes fixes the issue but sometimes does not. I
don't know if there's some kind of race condition occurring. We have about 15
topologies all connecting to kafka the same way, and this doesn't happen
consistently between them.
Does anyone have any insight as to why this occurs or how to prevent it on our
end?
Thanks,
Nick Cuneo / Software Engineer, Cloud / Enterprise Software
Tel: +1 949 517 4802 / Mobile: +1 949 243 4952
3 Ada / Irvine, CA 92618 / USA
[email protected] / www.tyco.com
[cid:[email protected]]
This email (including any attachments) may contain information that is private
or business confidential. If you received this email in error, please delete it
from your system without copying it and notify sender by reply email so that
our records can be corrected
________________________________
This e-mail contains privileged and confidential information intended for the
use of the addressees named above. If you are not the intended recipient of
this e-mail, you are hereby notified that you must not disseminate, copy or
take any action in respect of any information contained in it. If you have
received this e-mail in error, please notify the sender immediately by e-mail
and immediately destroy this e-mail and its attachments.