Yes, I faced the same issue with the latest storm Kafka client. Stig's
patch helps to solve this issue.

On Fri, 1 Jun 2018, 20:42 Stig Rohde Døssing, <[email protected]> wrote:

> Pretty sure it's this issue
> https://issues.apache.org/jira/browse/STORM-3046. There's a link to a
> patched 1.x branch you can use until we get a fix released.
>
> 2018-06-01 9:06 GMT+02:00 Aniket Alhat <[email protected]>:
>
>> Hi Stig, all
>>
>> I seem to stumble upon one more NPE :) , every time I restart my topology
>> (making use of trident opaque spout) I get the below stack trace.
>>
>>
>>
>> *kafka consumer config*2018-06-01 06:27:17.942 o.a.k.c.c.ConsumerConfig
>> [INFO] ConsumerConfig values:
>>         auto.commit.interval.ms = 5000
>>         auto.offset.reset = earliest
>>         bootstrap.servers = [*REDACTED*]
>>         check.crcs = true
>>         client.id = *REDACTED*
>>         connections.max.idle.ms = 540000
>>         enable.auto.commit = false
>>         exclude.internal.topics = true
>>         fetch.max.bytes = 52428800
>>         fetch.max.wait.ms = 500
>>         fetch.min.bytes = 1
>>         group.id = *REDACTED *
>>         heartbeat.interval.ms = 3000
>>         interceptor.classes = null
>>         key.deserializer = class
>> org.apache.kafka.common.serialization.StringDeserializer
>>         max.partition.fetch.bytes = 2097152
>>         max.poll.interval.ms = 300000
>>         max.poll.records = 500
>>         metadata.max.age.ms = 300000
>>         metric.reporters = []
>>         metrics.num.samples = 2
>>         metrics.sample.window.ms = 30000
>>         partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>>         receive.buffer.bytes = 65536
>>         reconnect.backoff.ms = 50
>>         request.timeout.ms = 305000
>>         retry.backoff.ms = 100
>>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>         sasl.kerberos.min.time.before.relogin = 60000
>>         sasl.kerberos.service.name = kafka
>>         sasl.kerberos.ticket.renew.jitter = 0.05
>>         sasl.kerberos.ticket.renew.window.factor = 0.8
>>         sasl.mechanism = GSSAPI
>>         security.protocol = SASL_PLAINTEXT
>>         send.buffer.bytes = 131072
>>         session.timeout.ms = 10000
>>         ssl.cipher.suites = null
>>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>         ssl.endpoint.identification.algorithm = null
>>         ssl.key.password = null
>>         ssl.keymanager.algorithm = SunX509
>>         ssl.keystore.location = null
>>         ssl.keystore.password = null
>>         ssl.keystore.type = JKS
>>         ssl.protocol = TLS
>>         ssl.provider = null
>>         ssl.secure.random.implementation = null
>>         ssl.trustmanager.algorithm = PKIX
>>         ssl.truststore.location = null
>>         ssl.truststore.password = null
>>         ssl.truststore.type = JKS
>>         value.deserializer = class
>> org.apache.kafka.common.serialization.StringDeserializer
>>
>> *server.log*
>> 2018-06-01 06:27:19.622 o.a.s.util [ERROR] Async loop died!
>> java.lang.RuntimeException: java.lang.NullPointerException
>>         at
>> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>> [storm-core-1.2.1.jar:1.2.1]
>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
>> Caused by: java.lang.NullPointerException
>>         at
>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
>> ~[stormjar.jar:?]
>>         at
>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
>> ~[stormjar.jar:?]
>>         at
>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
>> ~[stormjar.jar:?]
>>         at
>> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at
>> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         ... 6 more
>>
>> On Wed, May 23, 2018 at 12:23 AM, Stig Rohde Døssing <[email protected]>
>> wrote:
>>
>>> No, haven't seen it before.
>>>
>>> I took a look at NamedTopicFilter, and I think what's happening is that
>>> the partitionsFor call here
>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java#L57
>>> is returning null. Did you configure your filter to use topic names that
>>> don't exist yet?
>>>
>>> 2018-05-22 14:56 GMT+02:00 Aniket Alhat <[email protected]>:
>>>
>>>> Hello,
>>>>
>>>> I'm using Storm 1.2.1 with new storm-kafka-client and consuming from
>>>> brokers 0.11.2.0 with opaque trident spout.  But unfortunately my topology
>>>> crashes with below exception.
>>>>
>>>> 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
>>>> java.lang.NullPointerException: null
>>>>         at
>>>> org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTopicPartitions(NamedTopicFilter.java:57)
>>>> ~[stormjar.jar:?]
>>>>         at
>>>> org.apache.storm.kafka.spout.ManualPartitionSubscription.refreshAssignment(ManualPartitionSubscription.java:54)
>>>> ~[stormjar.jar:?]
>>>>         at
>>>> org.apache.storm.kafka.spout.ManualPartitionSubscription.subscribe(ManualPartitionSubscription.java:49)
>>>> ~[stormjar.jar:?]
>>>>         at
>>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManager.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59)
>>>> ~[stormjar.jar:?]
>>>>         at
>>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:84)
>>>> ~[stormjar.jar:?]
>>>>         at
>>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.<init>(KafkaTridentSpoutEmitter.java:100)
>>>> ~[stormjar.jar:?]
>>>>         at
>>>> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque.getEmitter(KafkaTridentSpoutOpaque.java:50)
>>>> ~[stormjar.jar:?]
>>>>         at
>>>> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97)
>>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>>         at
>>>> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221)
>>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>>         at
>>>> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39)
>>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>>         at
>>>> org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60)
>>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>>         at
>>>> org.apache.storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:245)
>>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>>         at
>>>> org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803)
>>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>>         at
>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482)
>>>> [storm-core-1.2.1.jar:1.2.1]
>>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
>>>>
>>>>
>>>> Does it look familiar ? Any help will be appreciated.
>>>>
>>>>
>>>>
>>>> On Wed, May 16, 2018 at 1:56 PM, Aniket Alhat <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Bobby,
>>>>>
>>>>> Thanks for your reply, indeed SimpleConsumer.scala doesn't support
>>>>> extraction of data from Kafka via. Kerberos in which case I'll start
>>>>> thinking to upgrade Storm to 1.2.1 and look to use storm-kafka-client, I'm
>>>>> specifically using Transnational Non Opaque Spout (
>>>>> https://issues.apache.org/jira/browse/STORM-2974) which is still
>>>>> under review it, I'm thinking of using it by building it from the PR as
>>>>> suggested by Stig but it will be great help if we can think of making this
>>>>> available in 1.2.x soon.
>>>>>
>>>>> On Tue, May 15, 2018 at 11:27 PM, Bobby Evans <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I am not sure you can.  storm-kafka 0.10.0 uses kafka version
>>>>>> 0.8.1.1, and I don't think security was added into kafka yet.
>>>>>>
>>>>>> https://kafka.apache.org/081/documentation.html
>>>>>>
>>>>>> If you have overridden that version of the kafka client and are
>>>>>> running with a newer version of kafka, you might be able to, but storm
>>>>>> itself has no special support for it in those older versions because 
>>>>>> kafka
>>>>>> didn't support it yet.  Your best bet would be to upgrade to a newer
>>>>>> version of storm and switch to the storm-kafka-clients package + API.
>>>>>>
>>>>>> In older versions of storm like this you might be able to place a
>>>>>> keytab on each of the nodes, then you will need to configure kafka to use
>>>>>> those keytabs for authentication.  In that version of storm you can set
>>>>>> kafka broker properties in the kafka.broker.properties storm config
>>>>>> as a map, but I honestly don't know if there is more you would need to do
>>>>>> for it.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Bobby
>>>>>>
>>>>>>
>>>>>> On Tue, May 15, 2018 at 2:34 AM Aniket Alhat <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Hao, Zimmer
>>>>>>>
>>>>>>> Where you guys able to get the any feedback on this? How did you get
>>>>>>> around this issue?
>>>>>>>
>>>>>>> On Wed, Jul 13, 2016 at 9:23 PM, Ziemer, Tom <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi everybody,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I am facing the same problem (with Storm 1.0.1 and Kafka 0.9.0.1).
>>>>>>>> Any ideas?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Tom
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From:* Hao Chen [mailto:[email protected]]
>>>>>>>> *Sent:* Dienstag, 12. Juli 2016 02:44
>>>>>>>> *To:* [email protected]
>>>>>>>> *Subject:* How to use KafkaSpout to consume kafka cluster secured
>>>>>>>> with kerberos
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Hi Team,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> How to use KafkaSpout to consume kafka cluster secured with
>>>>>>>> kerberos? I can't find lots of accurate information about the API.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> The versions I am using are:
>>>>>>>>
>>>>>>>>    - storm: 0.9.3
>>>>>>>>    - storm-kafka: 0.10.0
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> - Hao
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *Aniket Alhat*
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Aniket Alhat*
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *Aniket Alhat*
>>>>
>>>>
>>>
>>
>>
>> --
>>
>> *Aniket Alhat*
>>
>>
>

Reply via email to