[jira] [Updated] (STORM-3082) NamedTopicFilter can't handle topics that don't exist yet
[ https://issues.apache.org/jira/browse/STORM-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-3082: -- Affects Version/s: 1.1.3 > NamedTopicFilter can't handle topics that don't exist yet > - > > Key: STORM-3082 > URL: https://issues.apache.org/jira/browse/STORM-3082 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1, 1.1.3 >Reporter: Stig Rohde Døssing >Assignee: Aniket Alhat >Priority: Minor > Labels: newbie, pull-request-available > Fix For: 2.0.0, 1.2.3, 1.1.4 > > Time Spent: 1h 40m > Remaining Estimate: 0h > > [~aniket.alhat] reported on the mailing list that he got an NPE when trying > to start the Trident spout. > {code} > 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died! > java.lang.NullPointerException: null > at > org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:84) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:100) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50) > ~[stormjar.jar:?] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.(OpaquePartitionedTridentSpoutExecutor.java:97) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > {code} > It looks to me like the partitionsFor method on the consumer will return null > if the specified topic doesn't exist. We didn't account for this in the > filter, because the return type of the method is a List, and we assumed it > wouldn't be null. > I think it's reasonable that people should be able to subscribe to topics > that don't exist yet, and the spout should pick up the new topics eventually. > We should check for null here > https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55, > and maybe log a warning if the returned value is null. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3082) NamedTopicFilter can't handle topics that don't exist yet
[ https://issues.apache.org/jira/browse/STORM-3082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated STORM-3082: -- Labels: newbie pull-request-available (was: newbie) > NamedTopicFilter can't handle topics that don't exist yet > - > > Key: STORM-3082 > URL: https://issues.apache.org/jira/browse/STORM-3082 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client >Affects Versions: 2.0.0, 1.2.1 >Reporter: Stig Rohde Døssing >Assignee: Aniket Alhat >Priority: Minor > Labels: newbie, pull-request-available > > [~aniket.alhat] reported on the mailing list that he got an NPE when trying > to start the Trident spout. > {code} > 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died! > java.lang.NullPointerException: null > at > org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:84) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.(KafkaTridentSpoutEmitter.java:100) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50) > ~[stormjar.jar:?] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.(OpaquePartitionedTridentSpoutExecutor.java:97) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > {code} > It looks to me like the partitionsFor method on the consumer will return null > if the specified topic doesn't exist. We didn't account for this in the > filter, because the return type of the method is a List, and we assumed it > wouldn't be null. > I think it's reasonable that people should be able to subscribe to topics > that don't exist yet, and the spout should pick up the new topics eventually. > We should check for null here > https://github.com/apache/storm/blob/93ed601425a79759c0189a945c6b46266e5c9ced/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java#L55, > and maybe log a warning if the returned value is null. -- This message was sent by Atlassian JIRA (v7.6.3#76005)