[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jungtaek Lim updated STORM-3046: Fix Version/s: 1.2.3 > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Assignee: Stig Rohde Døssing >Priority: Blocker > Labels: kafka, pull-request-available, storm-kafka-client, > trident > Fix For: 2.0.0, 1.2.3 > > Attachments: TestTopology.java > > Time Spent: 50m > Remaining Estimate: 0h > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.*; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; > import org.apache.storm.trident.Stream; > import org.apache.storm.trident.TridentState; > import org.apache.storm.trident.TridentTopology; > import org.apache.storm.tuple.Fields; > import java.util.Properties; > > public class TestTopology { > > private static StormTopology buildTopology(Properties stormProperties) { > > Properties kafkaProperties = getProperties("/kafka.properties"); > TridentTopology topology = new TridentTopology(); > Fields stageArguments = new Fields("test", "issue"); > KafkaSpoutConfig kafkaSpoutConfig = > KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), > "test") > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) > .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) > .setRecordTranslator(new RecordTranslator(), stageArguments) >
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated STORM-3046: -- Labels: kafka pull-request-available storm-kafka-client trident (was: kafka storm-kafka-client trident) > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Assignee: Stig Rohde Døssing >Priority: Blocker > Labels: kafka, pull-request-available, storm-kafka-client, > trident > Attachments: TestTopology.java > > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.*; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; > import org.apache.storm.trident.Stream; > import org.apache.storm.trident.TridentState; > import org.apache.storm.trident.TridentTopology; > import org.apache.storm.tuple.Fields; > import java.util.Properties; > > public class TestTopology { > > private static StormTopology buildTopology(Properties stormProperties) { > > Properties kafkaProperties = getProperties("/kafka.properties"); > TridentTopology topology = new TridentTopology(); > Fields stageArguments = new Fields("test", "issue"); > KafkaSpoutConfigkafkaSpoutConfig = > KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), > "test") > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) > .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) > .setRecordTranslator(new RecordTranslator(),
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-3046: -- Component/s: (was: rm-kafka-client) storm-kafka-client > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Priority: Blocker > Labels: kafka, storm-kafka-client, trident > Attachments: TestTopology.java > > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.*; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; > import org.apache.storm.trident.Stream; > import org.apache.storm.trident.TridentState; > import org.apache.storm.trident.TridentTopology; > import org.apache.storm.tuple.Fields; > import java.util.Properties; > > public class TestTopology { > > private static StormTopology buildTopology(Properties stormProperties) { > > Properties kafkaProperties = getProperties("/kafka.properties"); > TridentTopology topology = new TridentTopology(); > Fields stageArguments = new Fields("test", "issue"); > KafkaSpoutConfigkafkaSpoutConfig = > KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), > "test") > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) > .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) > .setRecordTranslator(new RecordTranslator(), stageArguments) > .build(); > KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new >
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. If I set poll strategy to earliest and the topic already contains some messages, it works fine. I have used a custom record translator which is working fine. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new Config();
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I set poll strategy to earliest and the topic already contains some messages, it works fine. // I have used a custom record translator which is working fine. KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I set poll strategy to earliest and the topic already contains some messages, it works fine. // I have used a custom record translator which is working fine. KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new Config();
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kush Khandelwal updated STORM-3046: --- Description: I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients version 1.1.0. We have an external kafka from where we get the messages. Whenever I try to run the topology, I get a NPE, which leads to the worker getting died. Can someone please help me fix the issue? Thanks. Error - 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR o.a.s.util - Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[storm-kafka-client-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more Topology class - import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.*; import org.apache.storm.kafka.spout.KafkaSpoutConfig; import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; import java.util.Properties; public class TestTopology { private static StormTopology buildTopology(Properties stormProperties) { Properties kafkaProperties = getProperties("/kafka.properties"); TridentTopology topology = new TridentTopology(); Fields stageArguments = new Fields("test", "issue"); KafkaSpoutConfigkafkaSpoutConfig = KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), "test") .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) .setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I set poll strategy to earliest and the topic already contains some messages, it works fine. // I have used a custom record translator which is working fine. KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); final Stream kafkaSpout = topology.newStream(stormProperties.getProperty("SPOUT_NAME"), kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState testUpdate = kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), stageArguments).parallelismHint(1); Stream viewUpdate = ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments, new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); } public static void main(String[] args) { Config conf = new Config();