Re: NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die

2018-11-12 Thread Stig Rohde Døssing
I don't think beginningOffsets is null. I think it's missing one of the
partitions, which would mean the right hand side of the line is null, which
gives an NPE when we try to assign it to a primitive long.

I think this could be due to
https://issues.apache.org/jira/browse/KAFKA-7044, going by the commit
message for the fix
https://github.com/apache/kafka/commit/e2ec2d79c8d5adefc0c764583cec47144dbc5705#diff-b45245913eaae46aa847d2615d62cde0.
Specifically part 2 sounds a lot like what I think might be happening here.

"

`ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()`
assumed that endOffsets()/beginningOffsets() which eventually call
Fetcher.fetchOffsetsByTimes(), would return a map with all the topic
partitions passed to endOffsets()/beginningOffsets() and that values
are not null. Because of (1), null values were possible if some of the
topic partitions were already known (in metadata cache) and some not
(metadata cache did not have entries for some of the topic
partitions). However, even with fixing (1),
endOffsets()/beginningOffsets() may return a map with some topic
partitions missing, when list offset request returns a non-retriable
error.

"

Basically KafkaOffsetMetric also assumes that when beginningOffsets(topics)
is called, the returned map will contain a value for all requested topics.
Could you try upgrading to Kafka 2.0.1?

If necessary we can also work around this on the Storm side by skipping the
metrics if the requested partition isn't in the return values for
beginningOffsets/endOffsets. Feel free to raise an issue for this.

Den man. 12. nov. 2018 kl. 21.56 skrev Alexandre Vermeerbergen <
avermeerber...@gmail.com>:

> Hello,
>
> Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs
> (storm-core & storm-kafka-client) taken from same Git, we get the
> following crash coming from a NullPointerException in
> KafkaOffsetMetric.getValueAndReset :
>
> 2018-11-12 19:31:30.496 o.a.s.util
> Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died!
>
> java.lang.RuntimeException: java.lang.NullPointerException
>
>   at
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>   at
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>   at
> org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>   at
> org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>   at
> org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>   at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484)
> [storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>   at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]
>
> Caused by: java.lang.NullPointerException
>
>   at
> org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89)
> ~[stormjar.jar:?]
>
>   at
> org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345)
> ~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]
>
>   at clojure.core$map$fn__4553.invoke(core.clj:2622)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.lang.LazySeq.sval(LazySeq.java:40)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.lang.LazySeq.seq(LazySeq.java:49)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]
>
>   at clojure.core$seq__4128.invoke(core.clj:137)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.core$filter$fn__4580.invoke(core.clj:2679)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.lang.LazySeq.sval(LazySeq.java:40)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.lang.LazySeq.seq(LazySeq.java:49)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]
>
>   at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]
>
>   at clojure.core$next__4112.invoke(core.clj:64)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
> ~[clojure-1.7.0.jar:?]
>
>   at
> clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
> ~[clojure-1.7.0.jar:?]
>
>   at
> clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
> ~[clojure-1.7.0.jar:?]
>
>   at clojure.core$reduce.invoke(core.clj:6519)
> ~[clojure-1.7.0.jar:?]
>
>   at 

NullPointerException in KafkaOffsetMetric.getValueAndReset causing worker to die

2018-11-12 Thread Alexandre Vermeerbergen
Hello,

Using Storm 1.2.3-snapshot of the 3rd of November 2018 with all libs
(storm-core & storm-kafka-client) taken from same Git, we get the
following crash coming from a NullPointerException in
KafkaOffsetMetric.getValueAndReset :

2018-11-12 19:31:30.496 o.a.s.util
Thread-9-metricsFromKafka-executor[13 13] [ERROR] Async loop died!

java.lang.RuntimeException: java.lang.NullPointerException

  at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at 
org.apache.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:477)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at org.apache.storm.disruptor$consume_batch.invoke(disruptor.clj:70)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at 
org.apache.storm.daemon.executor$fn__9620$fn__9635$fn__9666.invoke(executor.clj:634)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at org.apache.storm.util$async_loop$fn__561.invoke(util.clj:484)
[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]

  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_192]

Caused by: java.lang.NullPointerException

  at 
org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric.getValueAndReset(KafkaOffsetMetric.java:89)
~[stormjar.jar:?]

  at 
org.apache.storm.daemon.executor$metrics_tick$fn__9544.invoke(executor.clj:345)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at clojure.core$map$fn__4553.invoke(core.clj:2622)
~[clojure-1.7.0.jar:?]

  at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]

  at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]

  at clojure.lang.RT.seq(RT.java:507) ~[clojure-1.7.0.jar:?]

  at clojure.core$seq__4128.invoke(core.clj:137) ~[clojure-1.7.0.jar:?]

  at clojure.core$filter$fn__4580.invoke(core.clj:2679)
~[clojure-1.7.0.jar:?]

  at clojure.lang.LazySeq.sval(LazySeq.java:40) ~[clojure-1.7.0.jar:?]

  at clojure.lang.LazySeq.seq(LazySeq.java:49) ~[clojure-1.7.0.jar:?]

  at clojure.lang.Cons.next(Cons.java:39) ~[clojure-1.7.0.jar:?]

  at clojure.lang.RT.next(RT.java:674) ~[clojure-1.7.0.jar:?]

  at clojure.core$next__4112.invoke(core.clj:64) ~[clojure-1.7.0.jar:?]

  at clojure.core.protocols$fn__6523.invoke(protocols.clj:170)
~[clojure-1.7.0.jar:?]

  at 
clojure.core.protocols$fn__6478$G__6473__6487.invoke(protocols.clj:19)
~[clojure-1.7.0.jar:?]

  at clojure.core.protocols$seq_reduce.invoke(protocols.clj:31)
~[clojure-1.7.0.jar:?]

  at clojure.core.protocols$fn__6506.invoke(protocols.clj:101)
~[clojure-1.7.0.jar:?]

  at 
clojure.core.protocols$fn__6452$G__6447__6465.invoke(protocols.clj:13)
~[clojure-1.7.0.jar:?]

  at clojure.core$reduce.invoke(core.clj:6519) ~[clojure-1.7.0.jar:?]

  at clojure.core$into.invoke(core.clj:6600) ~[clojure-1.7.0.jar:?]

  at 
org.apache.storm.daemon.executor$metrics_tick.invoke(executor.clj:349)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at 
org.apache.storm.daemon.executor$fn__9620$tuple_action_fn__9626.invoke(executor.clj:522)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__9609.invoke(executor.clj:471)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at 
org.apache.storm.disruptor$clojure_handler$reify__9120.onEvent(disruptor.clj:41)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
~[storm-core-1.2.3-SNAPSHOT.jar:1.2.3-SNAPSHOT]

  ... 7 more


In source code, the null pointer exception comes from the following
line of KafkaOffsetMetric.java:

long earliestTimeOffset = beginningOffsets.get(topicPartition);

The NullPointerException causes the crash of the worker process
hosting the Spout, which leads to countless Netty error messages until
the Spout is restaured on another worker.

Note: We are using Storm Kafka Client with Kafka Client 2.0.0 and
Scala 2.12, on a cluster with 7 Supervisor nodes; the topology that
getting these crashes consumes a very high volume of data on a Kafka
topic having 16 partitions.
All this running with ORACLE Java 8 update 192 on CentOS 7.

Any idea why beginningOffsets could be null ?

Kind regards,
Alexandre Vermeerbergen


[GitHub] storm issue #2906: STORM-3123 - add support for Kafka security config in sto...

2018-11-12 Thread arunmahadevan
Github user arunmahadevan commented on the issue:

https://github.com/apache/storm/pull/2906
  
@VipinRathor 
I pulled in the relevant changes from 
https://github.com/apache/storm/pull/2760 and created this so that we can take 
it forward. I need to do some tests and will update once done.

cc @priyank5485 @HeartSaVioR 


---


[GitHub] storm pull request #2906: STORM-3123 - add support for Kafka security config...

2018-11-12 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/storm/pull/2906

STORM-3123 - add support for Kafka security config in storm-kafka-monitor



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/storm STORM-3123

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2906.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2906


commit b42ff565ea7ed17c309912ed95425c14c62b3a36
Author: Vipin Rathor 
Date:   2018-07-12T00:01:36Z

STORM-3123 - add support for Kafka security config in storm-kafka-monitor

commit 3e30ab2954e4ebb62dd91b91247eb11dbbbff78a
Author: Arun Mahadevan 
Date:   2018-11-12T19:19:31Z

STORM-3123: Changes to return extra properties from KafkaSpout and use it 
in TopologySpoutLag

Change-Id: I69e55abbb9c0e84cfd2b2f5fcd07d1ab6ef19dc4




---


Re: Storm Developer

2018-11-12 Thread Ethan Li
Hi Ashwin,

Here are some steps I follow. Please skip steps that you already know.

Initial:
1. git clone g...@github.com:apache/storm.git 

General steps:
1. Make modification to the source code. 

2. Let’s say the source code is located at ~/Workspace/storm. Recompile and 
rebuild a storm binary distribution
cd ~/Workspace/storm
mvn clean install -DskipTests
cd ~/Workspace /storm/storm-dist/binary
mvn clean package -Dgpg.skip=true

3. You will find a tar ball at 
~/Workspace/storm/storm-dist/binary/final-package/target/apache-storm-2.0.1-SNAPSHOT.tar.gz.
 Uncompress it to a certain location to install it.


After the first build, you can just build certain modules, e.g. storm-server, 
storm-core and replace the jar files, instead of building the whole binary 
distribution. 

You can also refer to 
https://github.com/apache/storm/blob/master/DEVELOPER.md#building

Thanks
Ethan


> On Nov 12, 2018, at 12:31 AM, Ashwin Siddharth  wrote:
> 
> Hi,
> 
> I am not able to make modifications to storm source code and build it ,
> It would be very helpful If I could get hold of an example where
> modifications are done to the storm source code, and how the modified
> source code is built and how the changes made get reflected.
> 
> 
> Please , I want help regarding this issue , I really want it.
> 
> Regards,
> Ashwin