Yes, I faced the same issue with the latest storm Kafka client. Stig's patch helps to solve this issue.
On Fri, 1 Jun 2018, 20:42 Stig Rohde Døssing, <[email protected]> wrote: > Pretty sure it's this issue > https://issues.apache.org/jira/browse/STORM-3046. There's a link to a > patched 1.x branch you can use until we get a fix released. > > 2018-06-01 9:06 GMT+02:00 Aniket Alhat <[email protected]>: > >> Hi Stig, all >> >> I seem to stumble upon one more NPE :) , every time I restart my topology >> (making use of trident opaque spout) I get the below stack trace. >> >> >> >> *kafka consumer config*2018-06-01 06:27:17.942 o.a.k.c.c.ConsumerConfig >> [INFO] ConsumerConfig values: >> auto.commit.interval.ms = 5000 >> auto.offset.reset = earliest >> bootstrap.servers = [*REDACTED*] >> check.crcs = true >> client.id = *REDACTED* >> connections.max.idle.ms = 540000 >> enable.auto.commit = false >> exclude.internal.topics = true >> fetch.max.bytes = 52428800 >> fetch.max.wait.ms = 500 >> fetch.min.bytes = 1 >> group.id = *REDACTED * >> heartbeat.interval.ms = 3000 >> interceptor.classes = null >> key.deserializer = class >> org.apache.kafka.common.serialization.StringDeserializer >> max.partition.fetch.bytes = 2097152 >> max.poll.interval.ms = 300000 >> max.poll.records = 500 >> metadata.max.age.ms = 300000 >> metric.reporters = [] >> metrics.num.samples = 2 >> metrics.sample.window.ms = 30000 >> partition.assignment.strategy = [class >> org.apache.kafka.clients.consumer.RangeAssignor] >> receive.buffer.bytes = 65536 >> reconnect.backoff.ms = 50 >> request.timeout.ms = 305000 >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> sasl.kerberos.min.time.before.relogin = 60000 >> sasl.kerberos.service.name = kafka >> sasl.kerberos.ticket.renew.jitter = 0.05 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> sasl.mechanism = GSSAPI >> security.protocol = SASL_PLAINTEXT >> send.buffer.bytes = 131072 >> session.timeout.ms = 10000 >> ssl.cipher.suites = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> ssl.endpoint.identification.algorithm = null >> ssl.key.password = null >> ssl.keymanager.algorithm = SunX509 >> ssl.keystore.location = null >> ssl.keystore.password = null >> ssl.keystore.type = JKS >> ssl.protocol = TLS >> ssl.provider = null >> ssl.secure.random.implementation = null >> ssl.trustmanager.algorithm = PKIX >> ssl.truststore.location = null >> ssl.truststore.password = null >> ssl.truststore.type = JKS >> value.deserializer = class >> org.apache.kafka.common.serialization.StringDeserializer >> >> *server.log* >> 2018-06-01 06:27:19.622 o.a.s.util [ERROR] Async loop died! >> java.lang.RuntimeException: java.lang.NullPointerException >> at >> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) >> ~[storm-core-1.2.1.jar:1.2.1] >> at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >> [storm-core-1.2.1.jar:1.2.1] >> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] >> Caused by: java.lang.NullPointerException >> at >> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) >> ~[stormjar.jar:?] >> at >> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) >> ~[stormjar.jar:?] >> at >> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) >> ~[stormjar.jar:?] >> at >> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) >> ~[storm-core-1.2.1.jar:1.2.1] >> at >> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) >> ~[storm-core-1.2.1.jar:1.2.1] >> ... 6 more >> >> On Wed, May 23, 2018 at 12:23 AM, Stig Rohde Døssing <[email protected]> >> wrote: >> >>> No, haven't seen it before. >>> >>> I took a look at NamedTopicFilter, and I think what's happening is that >>> the partitionsFor call here >>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java#L57 >>> is returning null. Did you configure your filter to use topic names that >>> don't exist yet? >>> >>> 2018-05-22 14:56 GMT+02:00 Aniket Alhat <[email protected]>: >>> >>>> Hello, >>>> >>>> I'm using Storm 1.2.1 with new storm-kafka-client and consuming from >>>> brokers 0.11.2.0 with opaque trident spout. But unfortunately my topology >>>> crashes with below exception. >>>> >>>> 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died! >>>> java.lang.NullPointerException: null >>>> at >>>> org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57) >>>> ~[stormjar.jar:?] >>>> at >>>> org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54) >>>> ~[stormjar.jar:?] >>>> at >>>> org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49) >>>> ~[stormjar.jar:?] >>>> at >>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59) >>>> ~[stormjar.jar:?] >>>> at >>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:84) >>>> ~[stormjar.jar:?] >>>> at >>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:100) >>>> ~[stormjar.jar:?] >>>> at >>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50) >>>> ~[stormjar.jar:?] >>>> at >>>> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97) >>>> ~[storm-core-1.2.1.jar:1.2.1] >>>> at >>>> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221) >>>> ~[storm-core-1.2.1.jar:1.2.1] >>>> at >>>> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39) >>>> ~[storm-core-1.2.1.jar:1.2.1] >>>> at >>>> org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60) >>>> ~[storm-core-1.2.1.jar:1.2.1] >>>> at >>>> org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245) >>>> ~[storm-core-1.2.1.jar:1.2.1] >>>> at >>>> org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803) >>>> ~[storm-core-1.2.1.jar:1.2.1] >>>> at >>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482) >>>> [storm-core-1.2.1.jar:1.2.1] >>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151] >>>> >>>> >>>> Does it look familiar ? Any help will be appreciated. >>>> >>>> >>>> >>>> On Wed, May 16, 2018 at 1:56 PM, Aniket Alhat <[email protected]> >>>> wrote: >>>> >>>>> Hi Bobby, >>>>> >>>>> Thanks for your reply, indeed SimpleConsumer.scala doesn't support >>>>> extraction of data from Kafka via. Kerberos in which case I'll start >>>>> thinking to upgrade Storm to 1.2.1 and look to use storm-kafka-client, I'm >>>>> specifically using Transnational Non Opaque Spout ( >>>>> https://issues.apache.org/jira/browse/STORM-2974) which is still >>>>> under review it, I'm thinking of using it by building it from the PR as >>>>> suggested by Stig but it will be great help if we can think of making this >>>>> available in 1.2.x soon. >>>>> >>>>> On Tue, May 15, 2018 at 11:27 PM, Bobby Evans <[email protected]> >>>>> wrote: >>>>> >>>>>> I am not sure you can. storm-kafka 0.10.0 uses kafka version >>>>>> 0.8.1.1, and I don't think security was added into kafka yet. >>>>>> >>>>>> https://kafka.apache.org/081/documentation.html >>>>>> >>>>>> If you have overridden that version of the kafka client and are >>>>>> running with a newer version of kafka, you might be able to, but storm >>>>>> itself has no special support for it in those older versions because >>>>>> kafka >>>>>> didn't support it yet. Your best bet would be to upgrade to a newer >>>>>> version of storm and switch to the storm-kafka-clients package + API. >>>>>> >>>>>> In older versions of storm like this you might be able to place a >>>>>> keytab on each of the nodes, then you will need to configure kafka to use >>>>>> those keytabs for authentication. In that version of storm you can set >>>>>> kafka broker properties in the kafka.broker.properties storm config >>>>>> as a map, but I honestly don't know if there is more you would need to do >>>>>> for it. >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Bobby >>>>>> >>>>>> >>>>>> On Tue, May 15, 2018 at 2:34 AM Aniket Alhat <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Hao, Zimmer >>>>>>> >>>>>>> Where you guys able to get the any feedback on this? How did you get >>>>>>> around this issue? >>>>>>> >>>>>>> On Wed, Jul 13, 2016 at 9:23 PM, Ziemer, Tom < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi everybody, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I am facing the same problem (with Storm 1.0.1 and Kafka 0.9.0.1). >>>>>>>> Any ideas? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Tom >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *From:* Hao Chen [mailto:[email protected]] >>>>>>>> *Sent:* Dienstag, 12. Juli 2016 02:44 >>>>>>>> *To:* [email protected] >>>>>>>> *Subject:* How to use KafkaSpout to consume kafka cluster secured >>>>>>>> with kerberos >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Hi Team, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> How to use KafkaSpout to consume kafka cluster secured with >>>>>>>> kerberos? I can't find lots of accurate information about the API. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> The versions I am using are: >>>>>>>> >>>>>>>> - storm: 0.9.3 >>>>>>>> - storm-kafka: 0.10.0 >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> - Hao >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> *Aniket Alhat* >>>>>>> >>>>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> *Aniket Alhat* >>>>> >>>>> >>>> >>>> >>>> -- >>>> >>>> *Aniket Alhat* >>>> >>>> >>> >> >> >> -- >> >> *Aniket Alhat* >> >> >
