Hi Stig, all

I seem to stumble upon one more NPE :) , every time I restart my topology
(making use of trident opaque spout) I get the below stack trace.



*kafka consumer config*2018-06-01 06:27:17.942 o.a.k.c.c.ConsumerConfig
[INFO] ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [*REDACTED*]
        check.crcs = true
        client.id = *REDACTED*
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = *REDACTED *
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 2097152
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = kafka
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism = GSSAPI
        security.protocol = SASL_PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer

*server.log*
2018-06-01 06:27:19.622 o.a.s.util [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
        at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Caused by: java.lang.NullPointerException
        at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
~[stormjar.jar:?]
        at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
~[stormjar.jar:?]
        at
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
~[stormjar.jar:?]
        at
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
~[storm-core-1.2.1.jar:1.2.1]
        at
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
~[storm-core-1.2.1.jar:1.2.1]
        ... 6 more

On Wed, May 23, 2018 at 12:23 AM, Stig Rohde Døssing <[email protected]>
wrote:

> No, haven't seen it before.
>
> I took a look at NamedTopicFilter, and I think what's happening is that
> the partitionsFor call here https://github.com/apache/
> storm/blob/v1.2.1/external/storm-kafka-client/src/main/
> java/org/apache/storm/kafka/spout/NamedTopicFilter.java#L57 is returning
> null. Did you configure your filter to use topic names that don't exist yet?
>
> 2018-05-22 14:56 GMT+02:00 Aniket Alhat <[email protected]>:
>
>> Hello,
>>
>> I'm using Storm 1.2.1 with new storm-kafka-client and consuming from
>> brokers 0.11.2.0 with opaque trident spout.  But unfortunately my topology
>> crashes with below exception.
>>
>> 2018-05-22 06:23:02.318 o.a.s.util [ERROR] Async loop died!
>> java.lang.NullPointerException: null
>>         at org.apache.storm.kafka.spout.NamedTopicFilter.getFilteredTop
>> icPartitions(NamedTopicFilter.java:57) ~[stormjar.jar:?]
>>         at org.apache.storm.kafka.spout.ManualPartitionSubscription.ref
>> reshAssignment(ManualPartitionSubscription.java:54) ~[stormjar.jar:?]
>>         at org.apache.storm.kafka.spout.ManualPartitionSubscription.sub
>> scribe(ManualPartitionSubscription.java:49) ~[stormjar.jar:?]
>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutManage
>> r.createAndSubscribeKafkaConsumer(KafkaTridentSpoutManager.java:59)
>> ~[stormjar.jar:?]
>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitte
>> r.<init>(KafkaTridentSpoutEmitter.java:84) ~[stormjar.jar:?]
>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitte
>> r.<init>(KafkaTridentSpoutEmitter.java:100) ~[stormjar.jar:?]
>>         at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque
>> .getEmitter(KafkaTridentSpoutOpaque.java:50) ~[stormjar.jar:?]
>>         at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout
>> Executor$Emitter.<init>(OpaquePartitionedTridentSpoutExecutor.java:97)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout
>> Executor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:221)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at org.apache.storm.trident.spout.OpaquePartitionedTridentSpout
>> Executor.getEmitter(OpaquePartitionedTridentSpoutExecutor.java:39)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at 
>> org.apache.storm.trident.spout.TridentSpoutExecutor.prepare(TridentSpoutExecutor.java:60)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at org.apache.storm.trident.topology.TridentBoltExecutor.prepar
>> e(TridentBoltExecutor.java:245) ~[storm-core-1.2.1.jar:1.2.1]
>>         at 
>> org.apache.storm.daemon.executor$fn__5043$fn__5056.invoke(executor.clj:803)
>> ~[storm-core-1.2.1.jar:1.2.1]
>>         at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:482)
>> [storm-core-1.2.1.jar:1.2.1]
>>         at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
>>
>>
>> Does it look familiar ? Any help will be appreciated.
>>
>>
>>
>> On Wed, May 16, 2018 at 1:56 PM, Aniket Alhat <[email protected]>
>> wrote:
>>
>>> Hi Bobby,
>>>
>>> Thanks for your reply, indeed SimpleConsumer.scala doesn't support
>>> extraction of data from Kafka via. Kerberos in which case I'll start
>>> thinking to upgrade Storm to 1.2.1 and look to use storm-kafka-client, I'm
>>> specifically using Transnational Non Opaque Spout (
>>> https://issues.apache.org/jira/browse/STORM-2974) which is still under
>>> review it, I'm thinking of using it by building it from the PR as suggested
>>> by Stig but it will be great help if we can think of making this available
>>> in 1.2.x soon.
>>>
>>> On Tue, May 15, 2018 at 11:27 PM, Bobby Evans <[email protected]> wrote:
>>>
>>>> I am not sure you can.  storm-kafka 0.10.0 uses kafka version 0.8.1.1,
>>>> and I don't think security was added into kafka yet.
>>>>
>>>> https://kafka.apache.org/081/documentation.html
>>>>
>>>> If you have overridden that version of the kafka client and are running
>>>> with a newer version of kafka, you might be able to, but storm itself has
>>>> no special support for it in those older versions because kafka didn't
>>>> support it yet.  Your best bet would be to upgrade to a newer version of
>>>> storm and switch to the storm-kafka-clients package + API.
>>>>
>>>> In older versions of storm like this you might be able to place a
>>>> keytab on each of the nodes, then you will need to configure kafka to use
>>>> those keytabs for authentication.  In that version of storm you can set
>>>> kafka broker properties in the kafka.broker.properties storm config as
>>>> a map, but I honestly don't know if there is more you would need to do for
>>>> it.
>>>>
>>>> Thanks,
>>>>
>>>> Bobby
>>>>
>>>>
>>>> On Tue, May 15, 2018 at 2:34 AM Aniket Alhat <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Hao, Zimmer
>>>>>
>>>>> Where you guys able to get the any feedback on this? How did you get
>>>>> around this issue?
>>>>>
>>>>> On Wed, Jul 13, 2016 at 9:23 PM, Ziemer, Tom <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi everybody,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I am facing the same problem (with Storm 1.0.1 and Kafka 0.9.0.1).
>>>>>> Any ideas?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Tom
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Hao Chen [mailto:[email protected]]
>>>>>> *Sent:* Dienstag, 12. Juli 2016 02:44
>>>>>> *To:* [email protected]
>>>>>> *Subject:* How to use KafkaSpout to consume kafka cluster secured
>>>>>> with kerberos
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>>
>>>>>>
>>>>>> How to use KafkaSpout to consume kafka cluster secured with kerberos?
>>>>>> I can't find lots of accurate information about the API.
>>>>>>
>>>>>>
>>>>>>
>>>>>> The versions I am using are:
>>>>>>
>>>>>>    - storm: 0.9.3
>>>>>>    - storm-kafka: 0.10.0
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> - Hao
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Aniket Alhat*
>>>>>
>>>>>
>>>
>>>
>>> --
>>>
>>> *Aniket Alhat*
>>>
>>>
>>
>>
>> --
>>
>> *Aniket Alhat*
>>
>>
>


-- 

*Aniket Alhat*

Reply via email to