[ 
https://issues.apache.org/jira/browse/STORM-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim reopened STORM-2207:
---------------------------------

> Kafka Spout NullPointerException during ack
> -------------------------------------------
>
>                 Key: STORM-2207
>                 URL: https://issues.apache.org/jira/browse/STORM-2207
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.2
>            Reporter: Nick Cuneo
>            Assignee: Stig Rohde Døssing
>            Priority: Critical
>
> This occurs on startup of the topology.  There should be some null check 
> safeguards, but i'm not sure what's causing it to occur in the first 
> place...my guess is  the topic partition is not found in the ack map.
> 2016-11-17 23:11:05.366 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) 
> ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) 
> ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
> [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) 
> ~[stormjar.jar:?]
>     at 
> org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) 
> ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.379 o.a.s.d.executor [ERROR] 
> java.lang.RuntimeException: java.lang.NullPointerException
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:464)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:430)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:420) 
> ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:69) 
> ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:628)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:484) 
> [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> Caused by: java.lang.NullPointerException
>     at org.apache.storm.kafka.spout.KafkaSpout.ack(KafkaSpout.java:316) 
> ~[stormjar.jar:?]
>     at 
> org.apache.storm.daemon.executor$ack_spout_msg.invoke(executor.clj:448) 
> ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.daemon.executor$fn__7990$tuple_action_fn__7996.invoke(executor.clj:536)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__7979.invoke(executor.clj:464)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.disruptor$clojure_handler$reify__7492.onEvent(disruptor.clj:40)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:451)
>  ~[storm-core-1.0.2.jar:1.0.2]
>     ... 7 more
> 2016-11-17 23:11:05.473 o.a.s.util [ERROR] Halting process: ("Worker died")
> java.lang.RuntimeException: ("Worker died")
>     at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) 
> [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
>     at 
> org.apache.storm.daemon.worker$fn__8663$fn__8664.invoke(worker.clj:765) 
> [storm-core-1.0.2.jar:1.0.2]
>     at 
> org.apache.storm.daemon.executor$mk_executor_data$fn__7875$fn__7876.invoke(executor.clj:274)
>  [storm-core-1.0.2.jar:1.0.2]
>     at org.apache.storm.util$async_loop$fn__624.invoke(util.clj:494) 
> [storm-core-1.0.2.jar:1.0.2]
>     at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
> The method and line number in question below:
> @Override
>     public void ack(Object messageId) {
>         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
>         if (!consumerAutoCommitMode) {  // Only need to keep track of acked 
> tuples if commits are not done automatically
>             acked.get(msgId.getTopicPartition()).add(msgId);
>         }
>         emitted.remove(msgId);
> }



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to