Pretty sure it's this issue https://issues.apache.org/jira/browse/STORM-3046.
There's a link to a patched 1.x branch you can use until we get a fix
released.

2018-06-01 9:06 GMT+02:00 Aniket Alhat <[email protected]>:

> Hi Stig, all
>
> I seem to stumble upon one more NPE :) , every time I restart my topology
> (making use of trident opaque spout) I get the below stack trace.
>
>
>
> *kafka consumer config*2018-06-01 06:27:17.942 o.a.k.c.c.ConsumerConfig
> [INFO] ConsumerConfig values:
>         auto.commit.interval.ms = 5000
>         auto.offset.reset = earliest
>         bootstrap.servers = [*REDACTED*]
>         check.crcs = true
>         client.id = *REDACTED*
>         connections.max.idle.ms = 540000
>         enable.auto.commit = false
>         exclude.internal.topics = true
>         fetch.max.bytes = 52428800
>         fetch.max.wait.ms = 500
>         fetch.min.bytes = 1
>         group.id = *REDACTED *
>         heartbeat.interval.ms = 3000
>         interceptor.classes = null
>         key.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
>         max.partition.fetch.bytes = 2097152
>         max.poll.interval.ms = 300000
>         max.poll.records = 500
>         metadata.max.age.ms = 300000
>         metric.reporters = []
>         metrics.num.samples = 2
>         metrics.sample.window.ms = 30000
>         partition.assignment.strategy = [class org.apache.kafka.clients.
> consumer.RangeAssignor]
>         receive.buffer.bytes = 65536
>         reconnect.backoff.ms = 50
>         request.timeout.ms = 305000
>         retry.backoff.ms = 100
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>         sasl.kerberos.min.time.before.relogin = 60000
>         sasl.kerberos.service.name = kafka
>         sasl.kerberos.ticket.renew.jitter = 0.05
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>         sasl.mechanism = GSSAPI
>         security.protocol = SASL_PLAINTEXT
>         send.buffer.bytes = 131072
>         session.timeout.ms = 10000
>         ssl.cipher.suites = null
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>         ssl.endpoint.identification.algorithm = null
>         ssl.key.password = null
>         ssl.keymanager.algorithm = SunX509
>         ssl.keystore.location = null
>         ssl.keystore.password = null
>         ssl.keystore.type = JKS
>         ssl.protocol = TLS
>         ssl.provider = null
>         ssl.secure.random.implementation = null
>         ssl.trustmanager.algorithm = PKIX
>         ssl.truststore.location = null
>         ssl.truststore.password = null
>         ssl.truststore.type = JKS
>         value.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
>
> *server.log*
> 2018-06-01 06:27:19.622 o.a.s.util [ERROR] Async loop died!
> java.lang.RuntimeException: java.lang.NullPointerException
>         at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
> ~[storm-core-1.2.1.jar:1.2.1]
>         at org.apache.storm.utils.DisruptorQueue.
> consumeBatchWhenAvailable(DisruptorQueue.java:487)
> ~[storm-core-1.2.1.jar:1.2.1]
>         at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
> ~[storm-core-1.2.1.jar:1.2.1]
>         at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
> ~[storm-core-1.2.1.jar:1.2.1]
>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
> [storm-core-1.2.1.jar:1.2.1]
>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
> Caused by: java.lang.NullPointerException
>         at org.apache.storm.kafka.spout.trident.
> KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
> ~[stormjar.jar:?]
>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.
> emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[stormjar.jar:?]
>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.
> emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[stormjar.jar:?]
>         at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutE
> xecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
> ~[storm-core-1.2.1.jar:1.2.1]
>         at org.apache.storm.trident.spout.TridentSpoutExecutor.
> execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1]
>         at org.apache.storm.trident.topology.TridentBoltExecutor.
> execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1]
>         at org.apache.storm.daemon.executor$fn__5043$tuple_
> action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1]
>         at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
> ~[storm-core-1.2.1.jar:1.2.1]
>         at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
> ~[storm-core-1.2.1.jar:1.2.1]
>         at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
> ~[storm-core-1.2.1.jar:1.2.1]
>         ... 6 more
>
> On Wed, May 23, 2018 at 12:23 AM, Stig Rohde Døssing <[email protected]>
> wrote:
>
>> No, haven't seen it before.
>>
>> I took a look at NamedTopicFilter, and I think what's happening is that
>> the partitionsFor call here https://github.com/apache/stor
>> m/blob/v1.2.1/external/storm-kafka-client/src/main/java/
>> org/apache/storm/kafka/spout/NamedTopicFilter.java#L57 is returning
>> null. Did you configure your filter to use topic names that don't exist yet?
>>
>> 2018-05-22 14:56 GMT+02:00 Aniket Alhat <[email protected]>:
>>
>>> Hello,
>>>
>>> I'm using Storm 1.2.1 with new storm-kafka-client and consuming from
>>> brokers 0.11.2.0 with opaque trident spout.  But unfortunately my topology
>>> crashes with below exception.
>>>
>>> 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
>>> java.lang.NullPointerException: null
>>>         at org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTop
>>> icPartitions(NamedTopicFilter.java:57) ~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.ManualPartitionSubscription.ref
>>> reshAssignment(ManualPartitionSubscription.java:54) ~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.ManualPartitionSubscription.sub
>>> scribe(ManualPartitionSubscription.java:49) ~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManage
>>> r.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59)
>>> ~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitte
>>> r.<init>(KafkaTridentSpoutEmitter.java:84) ~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitte
>>> r.<init>(KafkaTridentSpoutEmitter.java:100) ~[stormjar.jar:?]
>>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque
>>> .getEmitter(KafkaTridentSpoutOpaque.java:50) ~[stormjar.jar:?]
>>>         at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout
>>> Executor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97)
>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>         at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout
>>> Executor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221)
>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>         at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout
>>> Executor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39)
>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>         at 
>>> org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60)
>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>         at org.apache.storm.trident.topology.TridentBoltExecutor.prepar
>>> e(TridentBoltExecutor.java:245) ~[storm-core-1.2.1.jar:1.2.1]
>>>         at 
>>> org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803)
>>> ~[storm-core-1.2.1.jar:1.2.1]
>>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482)
>>> [storm-core-1.2.1.jar:1.2.1]
>>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
>>>
>>>
>>> Does it look familiar ? Any help will be appreciated.
>>>
>>>
>>>
>>> On Wed, May 16, 2018 at 1:56 PM, Aniket Alhat <[email protected]>
>>> wrote:
>>>
>>>> Hi Bobby,
>>>>
>>>> Thanks for your reply, indeed SimpleConsumer.scala doesn't support
>>>> extraction of data from Kafka via. Kerberos in which case I'll start
>>>> thinking to upgrade Storm to 1.2.1 and look to use storm-kafka-client, I'm
>>>> specifically using Transnational Non Opaque Spout (
>>>> https://issues.apache.org/jira/browse/STORM-2974) which is still under
>>>> review it, I'm thinking of using it by building it from the PR as suggested
>>>> by Stig but it will be great help if we can think of making this available
>>>> in 1.2.x soon.
>>>>
>>>> On Tue, May 15, 2018 at 11:27 PM, Bobby Evans <[email protected]> wrote:
>>>>
>>>>> I am not sure you can.  storm-kafka 0.10.0 uses kafka version 0.8.1.1,
>>>>> and I don't think security was added into kafka yet.
>>>>>
>>>>> https://kafka.apache.org/081/documentation.html
>>>>>
>>>>> If you have overridden that version of the kafka client and are
>>>>> running with a newer version of kafka, you might be able to, but storm
>>>>> itself has no special support for it in those older versions because kafka
>>>>> didn't support it yet.  Your best bet would be to upgrade to a newer
>>>>> version of storm and switch to the storm-kafka-clients package + API.
>>>>>
>>>>> In older versions of storm like this you might be able to place a
>>>>> keytab on each of the nodes, then you will need to configure kafka to use
>>>>> those keytabs for authentication.  In that version of storm you can set
>>>>> kafka broker properties in the kafka.broker.properties storm config
>>>>> as a map, but I honestly don't know if there is more you would need to do
>>>>> for it.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Bobby
>>>>>
>>>>>
>>>>> On Tue, May 15, 2018 at 2:34 AM Aniket Alhat <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Hao, Zimmer
>>>>>>
>>>>>> Where you guys able to get the any feedback on this? How did you get
>>>>>> around this issue?
>>>>>>
>>>>>> On Wed, Jul 13, 2016 at 9:23 PM, Ziemer, Tom <[email protected]
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi everybody,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I am facing the same problem (with Storm 1.0.1 and Kafka 0.9.0.1).
>>>>>>> Any ideas?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Tom
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Hao Chen [mailto:[email protected]]
>>>>>>> *Sent:* Dienstag, 12. Juli 2016 02:44
>>>>>>> *To:* [email protected]
>>>>>>> *Subject:* How to use KafkaSpout to consume kafka cluster secured
>>>>>>> with kerberos
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi Team,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> How to use KafkaSpout to consume kafka cluster secured with
>>>>>>> kerberos? I can't find lots of accurate information about the API.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> The versions I am using are:
>>>>>>>
>>>>>>>    - storm: 0.9.3
>>>>>>>    - storm-kafka: 0.10.0
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> - Hao
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> *Aniket Alhat*
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> *Aniket Alhat*
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> *Aniket Alhat*
>>>
>>>
>>
>
>
> --
>
> *Aniket Alhat*
>
>

Reply via email to