Pretty sure it's this issue https://issues.apache.org/jira/browse/STORM-3046. There's a link to a patched 1.x branch you can use until we get a fix released.
2018-06-01 9:06 GMT+02:00 Aniket Alhat <[email protected]>: > Hi Stig, all > > I seem to stumble upon one more NPE :) , every time I restart my topology > (making use of trident opaque spout) I get the below stack trace. > > > > *kafka consumer config*2018-06-01 06:27:17.942 o.a.k.c.c.ConsumerConfig > [INFO] ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = [*REDACTED*] > check.crcs = true > client.id = *REDACTED* > connections.max.idle.ms = 540000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = *REDACTED * > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class org.apache.kafka.common.serialization. > StringDeserializer > max.partition.fetch.bytes = 2097152 > max.poll.interval.ms = 300000 > max.poll.records = 500 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.sample.window.ms = 30000 > partition.assignment.strategy = [class org.apache.kafka.clients. > consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = kafka > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = SASL_PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class org.apache.kafka.common.serialization. > StringDeserializer > > *server.log* > 2018-06-01 06:27:19.622 o.a.s.util [ERROR] Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.utils.DisruptorQueue. > consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] > Caused by: java.lang.NullPointerException > at org.apache.storm.kafka.spout.trident. > KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[stormjar.jar:?] > at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter. > emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[stormjar.jar:?] > at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter. > emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[stormjar.jar:?] > at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutE > xecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.trident.spout.TridentSpoutExecutor. > execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.trident.topology.TridentBoltExecutor. > execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.daemon.executor$fn__5043$tuple_ > action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > On Wed, May 23, 2018 at 12:23 AM, Stig Rohde Døssing <[email protected]> > wrote: > >> No, haven't seen it before. >> >> I took a look at NamedTopicFilter, and I think what's happening is that >> the partitionsFor call here https://github.com/apache/stor >> m/blob/v1.2.1/external/storm-kafka-client/src/main/java/ >> org/apache/storm/kafka/spout/NamedTopicFilter.java#L57 is returning >> null. Did you configure your filter to use topic names that don't exist yet? >> >> 2018-05-22 14:56 GMT+02:00 Aniket Alhat <[email protected]>: >> >>> Hello, >>> >>> I'm using Storm 1.2.1 with new storm-kafka-client and consuming from >>> brokers 0.11.2.0 with opaque trident spout. But unfortunately my topology >>> crashes with below exception. >>> >>> 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died! >>> java.lang.NullPointerException: null >>> at org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTop >>> icPartitions(NamedTopicFilter.java:57) ~[stormjar.jar:?] >>> at org.apache.storm.kafka.spout.ManualPartitionSubscription.ref >>> reshAssignment(ManualPartitionSubscription.java:54) ~[stormjar.jar:?] >>> at org.apache.storm.kafka.spout.ManualPartitionSubscription.sub >>> scribe(ManualPartitionSubscription.java:49) ~[stormjar.jar:?] >>> at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManage >>> r.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59) >>> ~[stormjar.jar:?] >>> at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitte >>> r.<init>(KafkaTridentSpoutEmitter.java:84) ~[stormjar.jar:?] >>> at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitte >>> r.<init>(KafkaTridentSpoutEmitter.java:100) ~[stormjar.jar:?] >>> at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque >>> .getEmitter(KafkaTridentSpoutOpaque.java:50) ~[stormjar.jar:?] >>> at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout >>> Executor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97) >>> ~[storm-core-1.2.1.jar:1.2.1] >>> at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout >>> Executor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221) >>> ~[storm-core-1.2.1.jar:1.2.1] >>> at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout >>> Executor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39) >>> ~[storm-core-1.2.1.jar:1.2.1] >>> at >>> org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60) >>> ~[storm-core-1.2.1.jar:1.2.1] >>> at org.apache.storm.trident.topology.TridentBoltExecutor.prepar >>> e(TridentBoltExecutor.java:245) ~[storm-core-1.2.1.jar:1.2.1] >>> at >>> org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) >>> ~[storm-core-1.2.1.jar:1.2.1] >>> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) >>> [storm-core-1.2.1.jar:1.2.1] >>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] >>> >>> >>> Does it look familiar ? Any help will be appreciated. >>> >>> >>> >>> On Wed, May 16, 2018 at 1:56 PM, Aniket Alhat <[email protected]> >>> wrote: >>> >>>> Hi Bobby, >>>> >>>> Thanks for your reply, indeed SimpleConsumer.scala doesn't support >>>> extraction of data from Kafka via. Kerberos in which case I'll start >>>> thinking to upgrade Storm to 1.2.1 and look to use storm-kafka-client, I'm >>>> specifically using Transnational Non Opaque Spout ( >>>> https://issues.apache.org/jira/browse/STORM-2974) which is still under >>>> review it, I'm thinking of using it by building it from the PR as suggested >>>> by Stig but it will be great help if we can think of making this available >>>> in 1.2.x soon. >>>> >>>> On Tue, May 15, 2018 at 11:27 PM, Bobby Evans <[email protected]> wrote: >>>> >>>>> I am not sure you can. storm-kafka 0.10.0 uses kafka version 0.8.1.1, >>>>> and I don't think security was added into kafka yet. >>>>> >>>>> https://kafka.apache.org/081/documentation.html >>>>> >>>>> If you have overridden that version of the kafka client and are >>>>> running with a newer version of kafka, you might be able to, but storm >>>>> itself has no special support for it in those older versions because kafka >>>>> didn't support it yet. Your best bet would be to upgrade to a newer >>>>> version of storm and switch to the storm-kafka-clients package + API. >>>>> >>>>> In older versions of storm like this you might be able to place a >>>>> keytab on each of the nodes, then you will need to configure kafka to use >>>>> those keytabs for authentication. In that version of storm you can set >>>>> kafka broker properties in the kafka.broker.properties storm config >>>>> as a map, but I honestly don't know if there is more you would need to do >>>>> for it. >>>>> >>>>> Thanks, >>>>> >>>>> Bobby >>>>> >>>>> >>>>> On Tue, May 15, 2018 at 2:34 AM Aniket Alhat <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Hao, Zimmer >>>>>> >>>>>> Where you guys able to get the any feedback on this? How did you get >>>>>> around this issue? >>>>>> >>>>>> On Wed, Jul 13, 2016 at 9:23 PM, Ziemer, Tom <[email protected] >>>>>> > wrote: >>>>>> >>>>>>> Hi everybody, >>>>>>> >>>>>>> >>>>>>> >>>>>>> I am facing the same problem (with Storm 1.0.1 and Kafka 0.9.0.1). >>>>>>> Any ideas? >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Tom >>>>>>> >>>>>>> >>>>>>> >>>>>>> *From:* Hao Chen [mailto:[email protected]] >>>>>>> *Sent:* Dienstag, 12. Juli 2016 02:44 >>>>>>> *To:* [email protected] >>>>>>> *Subject:* How to use KafkaSpout to consume kafka cluster secured >>>>>>> with kerberos >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> >>>>>>> >>>>>>> How to use KafkaSpout to consume kafka cluster secured with >>>>>>> kerberos? I can't find lots of accurate information about the API. >>>>>>> >>>>>>> >>>>>>> >>>>>>> The versions I am using are: >>>>>>> >>>>>>> - storm: 0.9.3 >>>>>>> - storm-kafka: 0.10.0 >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> - Hao >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> *Aniket Alhat* >>>>>> >>>>>> >>>> >>>> >>>> -- >>>> >>>> *Aniket Alhat* >>>> >>>> >>> >>> >>> -- >>> >>> *Aniket Alhat* >>> >>> >> > > > -- > > *Aniket Alhat* > >
