Re: NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die
I don't think beginningOffsets is null. I think it's missing one of the partitions, which would mean the right hand side of the line is null, which gives an NPE when we try to assign it to a primitive long. I think this could be due to https://issues.apache.org/jira/browse/KAFKA-7044, going by the commit message for the fix https://github.com/apache/kafka/commit/e2ec2d79c8d5adefc0c764583cec47144dbc5705#diff-b45245913eaae46aa847d2615d62cde0. Specifically part 2 sounds a lot like what I think might be happening here. " `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). However, even with fixing (1), endOffsets()/beginningOffsets() may return a map with some topic partitions missing, when list offset request returns a non-retriable error. " Basically KafkaOffsetMetric also assumes that when beginningOffsets(topics) is called, the returned map will contain a value for all requested topics. Could you try upgrading to Kafka 2.0.1? If necessary we can also work around this on the Storm side by skipping the metrics if the requested partition isn't in the return values for beginningOffsets/endOffsets. Feel free to raise an issue for this. Den man. 12. nov. 2018 kl. 21.56 skrev Alexandre Vermeerbergen < avermeerber...@gmail.com>: > Hello, > > Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs > (storm-core & storm-kafka-client) taken from same Git, we get the > following crash coming from a NullPointerException in > KafkaOffsetMetric.getValueAndReset : > > 2018-11-12 19:31:30.496 o.a.s.util > Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died! > > java.lang.RuntimeException: java.lang.NullPointerException > > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at > org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484) > [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192] > > Caused by: java.lang.NullPointerException > > at > org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89) > ~[stormjar.jar:?] > > at > org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345) > ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] > > at clojure.core$map$fn__4553.invoke(core.clj:2622) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.sval(LazySeq.java:40) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.seq(LazySeq.java:49) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] > > at clojure.core$seq__4128.invoke(core.clj:137) > ~[clojure-1.7.0.jar:?] > > at clojure.core$filter$fn__4580.invoke(core.clj:2679) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.sval(LazySeq.java:40) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.LazySeq.seq(LazySeq.java:49) > ~[clojure-1.7.0.jar:?] > > at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] > > at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] > > at clojure.core$next__4112.invoke(core.clj:64) > ~[clojure-1.7.0.jar:?] > > at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) > ~[clojure-1.7.0.jar:?] > > at > clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) > ~[clojure-1.7.0.jar:?] > > at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) > ~[clojure-1.7.0.jar:?] > > at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) > ~[clojure-1.7.0.jar:?] > > at > clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) > ~[clojure-1.7.0.jar:?] > > at clojure.core$reduce.invoke(core.clj:6519) > ~[clojure-1.7.0.jar:?] > > at
NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die
Hello, Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs (storm-core & storm-kafka-client) taken from same Git, we get the following crash coming from a NullPointerException in KafkaOffsetMetric.getValueAndReset : 2018-11-12 19:31:30.496 o.a.s.util Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484) [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89) ~[stormjar.jar:?] at org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at clojure.core$map$fn__4553.invoke(core.clj:2622) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?] at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?] at clojure.core$filter$fn__4580.invoke(core.clj:2679) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?] at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?] at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?] at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?] at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6523.invoke(protocols.clj:170) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6506.invoke(protocols.clj:101) ~[clojure-1.7.0.jar:?] at clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13) ~[clojure-1.7.0.jar:?] at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?] at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?] at org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT] ... 7 more In source code, the null pointer exception comes from the following line of KafkaOffsetMetric.java: long earliestTimeOffset = beginningOffsets.get(topicPartition); The NullPointerException causes the crash of the worker process hosting the Spout, which leads to countless Netty error messages until the Spout is restaured on another worker. Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that getting these crashes consumes a very high volume of data on a Kafka topic having 16 partitions. All this running with ORACLE Java 8 update 192 on CentOS 7. Any idea why beginningOffsets could be null ? Kind regards, Alexandre Vermeerbergen
[GitHub] storm issue #2906: STORM-3123 - add support for Kafka security config in sto...
Github user arunmahadevan commented on the issue: https://github.com/apache/storm/pull/2906 @VipinRathor I pulled in the relevant changes from https://github.com/apache/storm/pull/2760 and created this so that we can take it forward. I need to do some tests and will update once done. cc @priyank5485 @HeartSaVioR ---
[GitHub] storm pull request #2906: STORM-3123 - add support for Kafka security config...
GitHub user arunmahadevan opened a pull request: https://github.com/apache/storm/pull/2906 STORM-3123 - add support for Kafka security config in storm-kafka-monitor You can merge this pull request into a Git repository by running: $ git pull https://github.com/arunmahadevan/storm STORM-3123 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2906.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2906 commit b42ff565ea7ed17c309912ed95425c14c62b3a36 Author: Vipin Rathor Date: 2018-07-12T00:01:36Z STORM-3123 - add support for Kafka security config in storm-kafka-monitor commit 3e30ab2954e4ebb62dd91b91247eb11dbbbff78a Author: Arun Mahadevan Date: 2018-11-12T19:19:31Z STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag Change-Id: I69e55abbb9c0e84cfd2b2f5fcd07d1ab6ef19dc4 ---
Re: Storm Developer
Hi Ashwin, Here are some steps I follow. Please skip steps that you already know. Initial: 1. git clone g...@github.com:apache/storm.git General steps: 1. Make modification to the source code. 2. Let’s say the source code is located at ~/Workspace/storm. Recompile and rebuild a storm binary distribution cd ~/Workspace/storm mvn clean install -DskipTests cd ~/Workspace /storm/storm-dist/binary mvn clean package -Dgpg.skip=true 3. You will find a tar ball at ~/Workspace/storm/storm-dist/binary/final-package/target/apache-storm-2.0.1-SNAPSHOT.tar.gz. Uncompress it to a certain location to install it. After the first build, you can just build certain modules, e.g. storm-server, storm-core and replace the jar files, instead of building the whole binary distribution. You can also refer to https://github.com/apache/storm/blob/master/DEVELOPER.md#building Thanks Ethan > On Nov 12, 2018, at 12:31 AM, Ashwin Siddharth wrote: > > Hi, > > I am not able to make modifications to storm source code and build it , > It would be very helpful If I could get hold of an example where > modifications are done to the storm source code, and how the modified > source code is built and how the changes made get reflected. > > > Please , I want help regarding this issue , I really want it. > > Regards, > Ashwin