[jira] [Comment Edited] (STORM-3081) Storm kafka client not consuming messages properly

2018-05-23 Thread Kush Khandelwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487878#comment-16487878
 ] 

Kush Khandelwal edited comment on STORM-3081 at 5/23/18 7:30 PM:
-

We debugged it a little more and there are multiple situations we are facing 
randomly.
The spout randomly stops consuming messages from kafka.

The kafka spout config looks like this - 

 KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setRecordTranslator(new MessageDeserializer(), new 
Fields("msg"))

.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "message")
.build();


* There are times when the message is not consumed by the spout, but when we 
kill the topology and restart it, the messages which weren't consumed get 
   consumed automatically.
* Sometimes, the same set of messages is getting consumed multiple times.
* Sometimes, even after restarting the topology, the previous messages are not 
getting consumed at all.
* There are also times, when if we keep the topology running, some/all 
(randomly) of the missed messages get processed.

Either something related to committing the messages or getting the messages 
from the partition is screwing this up.

I can't seem to figure out why this is happening so randomly.

And how/where are the offsets stored ?

Can u please tell something which would help in solving this?


was (Author: kush.kh):
We debugged it a little more and there are multiple situations we are facing 
randomly.
The spout randomly stops consuming messages from kafka.

The kafka spout config looks like this - 

 KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setRecordTranslator(new MessageDeserializer(), new 
Fields("msg"))

.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "message")
.build();


* There are times when the message is not consumed by the spout, but when we 
kill the topology and restart it, the messages which weren't consumed get 
   consumed automatically.
* Sometimes, the same set of messages is getting consumed multiple times.
* Sometimes, even after restarting the topology, the previous messages are not 
getting consumed at all.
* There are also times, when if we keep the topology running, some/all 
(randomly) of the missed messages get processed.

Either something related to committing the messages or getting the messages 
from the partition is screwing this up.

I can't seem to figure out why this is happening so randomly.

Also, how does kafka commit work in this case?
Would it process another message if the commit is not successful?
And how/where are the offsets stored ?

Can u please tell something which would help in solving this?

> Storm kafka client not consuming messages properly
> --
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka, storm-kafka-client
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
> 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (STORM-3081) Storm kafka client not consuming messages properly

2018-05-23 Thread Kush Khandelwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487878#comment-16487878
 ] 

Kush Khandelwal edited comment on STORM-3081 at 5/23/18 7:27 PM:
-

We debugged it a little more and there are multiple situations we are facing 
randomly.
The spout randomly stops consuming messages from kafka.

The kafka spout config looks like this - 

 KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setRecordTranslator(new MessageDeserializer(), new 
Fields("msg"))

.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "message")
.build();


* There are times when the message is not consumed by the spout, but when we 
kill the topology and restart it, the messages which weren't consumed get 
   consumed automatically.
* Sometimes, the same set of messages is getting consumed multiple times.
* Sometimes, even after restarting the topology, the previous messages are not 
getting consumed at all.
* There are also times, when if we keep the topology running, some/all 
(randomly) of the missed messages get processed.

Either something related to committing the messages or getting the messages 
from the partition is screwing this up.

I can't seem to figure out why this is happening so randomly.

Also, how does kafka commit work in this case?
Would it process another message if the commit is not successful?
And how/where are the offsets stored ?

Can u please tell something which would help in solving this?


was (Author: kush.kh):
We debugged it a little more and there are multiple situations we are facing 
randomly.

The kafka spout config looks like this - 

 KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setRecordTranslator(new MessageDeserializer(), new 
Fields("msg"))

.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "message")
.build();


* There are times when the message is not consumed by the spout, but when we 
kill the topology and restart it, the messages which weren't consumed get 
   consumed automatically.
* Sometimes, the same set of messages is getting consumed multiple times.
* Sometimes, even after restarting the topology, the previous messages are not 
getting consumed at all.

My guess is, it somehow is missing some messages due to some offset issue. 
Either something related to committing the messages or getting the messages 
from the partition is screwing this up.

I can't seem to figure out why this is happening so randomly.


> Storm kafka client not consuming messages properly
> --
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka, storm-kafka-client
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
> 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3081) Storm kafka client not consuming messages properly

2018-05-23 Thread Kush Khandelwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487878#comment-16487878
 ] 

Kush Khandelwal commented on STORM-3081:


We debugged it a little more and there are multiple situations we are facing 
randomly.

The kafka spout config looks like this - 

 KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
.setRecordTranslator(new MessageDeserializer(), new 
Fields("msg"))

.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "message")
.build();


* There are times when the message is not consumed by the spout, but when we 
kill the topology and restart it, the messages which weren't consumed get 
   consumed automatically.
* Sometimes, the same set of messages is getting consumed multiple times.
* Sometimes, even after restarting the topology, the previous messages are not 
getting consumed at all.

My guess is, it somehow is missing some messages due to some offset issue. 
Either something related to committing the messages or getting the messages 
from the partition is screwing this up.

I can't seem to figure out why this is happening so randomly.


> Storm kafka client not consuming messages properly
> --
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka, storm-kafka-client
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
> 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (STORM-3081) Storm kafka client not consuming messages properly

2018-05-23 Thread Kush Khandelwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487330#comment-16487330
 ] 

Kush Khandelwal edited comment on STORM-3081 at 5/23/18 2:41 PM:
-

The topology has 39 bolts.
There are no tuple failures in the UI.
We tried enabling debug logging, but there also we didn't see any abnormal 
behaviour as such. Its polling every 200 ms like it should and even when we 
push messages to kafka, it logs - "Polled [0] records from Kafka.".
The problem is not sequential. Even if a message is not consumed, some 
different message is unaffected by it. It gets consumed properly inspite of the 
last unconsumed message.
So somehow storm is missing a message and incrementing the offset.
I changed the pollOffSet to Uncommitted_Earliest, but there was no change.
Can u please tell us something more which can help out with the issue?


Also, how can I set TOPOLOGY_BACKPRESSURE_ENABLE as false for the kafkaSpout.
I tried - 
KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.setRecordTranslator(new MessageDeserializer(), arguments)
.setProp(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false)
.build();

It didn't seem to work. 



was (Author: kush.kh):
The topology has 39 bolts.
There are no tuple failures in the UI.
We tried enabling debug logging, but there also we didn't see any abnormal 
behaviour as such. Its polling every 200 ms like it should and even when we 
push messages to kafka, it logs - "Polled [0] records from Kafka.".
The problem is not sequential. Even if a message is not consumed, some 
different message is unaffected by it. It gets consumed properly inspite of the 
last unconsumed message.
So somehow storm is missing a message and incrementing the offset.
I changed the pollOffSet to Uncommitted_Earliest, but there was no change.
Can u please tell us something more which can help out with the issue?


> Storm kafka client not consuming messages properly
> --
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka, storm-kafka-client
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
> 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-3081) Storm kafka client not consuming messages properly

2018-05-23 Thread Kush Khandelwal (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16487330#comment-16487330
 ] 

Kush Khandelwal commented on STORM-3081:


The topology has 39 bolts.
There are no tuple failures in the UI.
We tried enabling debug logging, but there also we didn't see any abnormal 
behaviour as such. Its polling every 200 ms like it should and even when we 
push messages to kafka, it logs - "Polled [0] records from Kafka.".
The problem is not sequential. Even if a message is not consumed, some 
different message is unaffected by it. It gets consumed properly inspite of the 
last unconsumed message.
So somehow storm is missing a message and incrementing the offset.
I changed the pollOffSet to Uncommitted_Earliest, but there was no change.
Can u please tell us something more which can help out with the issue?


> Storm kafka client not consuming messages properly
> --
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka, storm-kafka-client
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
> 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3081) Storm kafka client not consuming messages properly

2018-05-22 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3081:
---
Description: 
A single thread is pushing some messages to kafka serially and we consume it at 
the storm's end using storm-kafka-client.
After a few requests, the consumer is not able to consume from the queue 
blocking the thread which waits for the response from storm's end. 
We added one more consumer with a different consumer group and there the 
messages are getting read properly.
So we know there is some problem at the storm kafka client consumer's end.
The kafka spout config is written like this - 

KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.setRecordTranslator(new MessageDeserializer(), arguments)
.build();

I can't seem to figure out the issue.
Can someone please help me out?

Thanks.

  was:
A single thread is pushing some messages to kafka serially and we consume it at 
the storm's end using storm-kafka-client.
After a few requests, the consumer is not able to consume from the queue 
blocking the thread. 
We added one more consumer with a different consumer group and there the 
messages are getting read properly.
So we know there is some problem at the storm kafka client consumer's end.
The kafka spout config is written like this - 

KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.setRecordTranslator(new MessageDeserializer(), arguments)
.build();

I can't seem to figure out the issue.
Can someone please help me out?

Thanks.


> Storm kafka client not consuming messages properly
> --
>
> Key: STORM-3081
> URL: https://issues.apache.org/jira/browse/STORM-3081
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka, storm-kafka-client
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>
> A single thread is pushing some messages to kafka serially and we consume it 
> at the storm's end using storm-kafka-client.
> After a few requests, the consumer is not able to consume from the queue 
> blocking the thread which waits for the response from storm's end. 
> We added one more consumer with a different consumer group and there the 
> messages are getting read properly.
> So we know there is some problem at the storm kafka client consumer's end.
> The kafka spout config is written like this - 
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> stormProperties.getProperty("TOPIC"))
> 
> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
> .setRecordTranslator(new MessageDeserializer(), arguments)
> .build();
> I can't seem to figure out the issue.
> Can someone please help me out?
> Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (STORM-3081) Storm kafka client not consuming messages properly

2018-05-22 Thread Kush Khandelwal (JIRA)
Kush Khandelwal created STORM-3081:
--

 Summary: Storm kafka client not consuming messages properly
 Key: STORM-3081
 URL: https://issues.apache.org/jira/browse/STORM-3081
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-kafka, storm-kafka-client
Affects Versions: 1.2.1
Reporter: Kush Khandelwal


A single thread is pushing some messages to kafka serially and we consume it at 
the storm's end using storm-kafka-client.
After a few requests, the consumer is not able to consume from the queue 
blocking the thread. 
We added one more consumer with a different consumer group and there the 
messages are getting read properly.
So we know there is some problem at the storm kafka client consumer's end.
The kafka spout config is written like this - 

KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
stormProperties.getProperty("TOPIC"))

.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
.setRecordTranslator(new MessageDeserializer(), arguments)
.build();

I can't seem to figure out the issue.
Can someone please help me out?

Thanks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.
 I have used a custom record translator which is working fine.

 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

 

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import java.util.Properties;

 


public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties) {
 

Properties kafkaProperties = getProperties("/kafka.properties");
 TridentTopology topology = new TridentTopology();



Fields stageArguments = new Fields("test", "issue");




KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test")
 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
 .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
 .setRecordTranslator(new RecordTranslator(), stageArguments)
 .build();






KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);


Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");



final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
 
TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new 
MainMemoryStateUpdater(), stageArguments).parallelismHint(1);
 

Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2);

return topology.build();
 }

public static void main(String[] args) {
 Config conf = new Config();
 

[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

 

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import java.util.Properties;

 


public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties) {
 

Properties kafkaProperties = getProperties("/kafka.properties");
 TridentTopology topology = new TridentTopology();



Fields stageArguments = new Fields("test", "issue");




KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test")
 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
 .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
 .setRecordTranslator(new RecordTranslator(), stageArguments)
 .build();




// If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.
 // I have used a custom record translator which is working fine.


KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);


Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");



final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
 
TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new 
MainMemoryStateUpdater(), stageArguments).parallelismHint(1);
 

Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2);

return topology.build();
 }

public static void main(String[] args) {
 Config conf = new 

[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.*;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.tuple.Fields;

import java.util.Properties;

 

public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties)

{ Properties kafkaProperties = getProperties("/kafka.properties"); 
TridentTopology topology = new TridentTopology(); Fields stageArguments = 
new Fields("test", "issue"); KafkaSpoutConfig kafkaSpoutConfig 
= KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test") 
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) 
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) 
.setRecordTranslator(new RecordTranslator(), stageArguments) .build(); // If I 
set poll strategy to earliest and the topic already contains some messages, it 
works fine. // I have used a custom record translator which is working fine. 
KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig); Grouping partitionGroup = 
getPartitionGroup("test"); log.info("Creating Opaque-Trident-Kafka-Spout"); 
final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1); TridentState 
testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), 
stageArguments).parallelismHint(1); Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); }

public static void main(String[] args) {
 Config conf = new Config();
 

[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kush Khandelwal updated STORM-3046:
---
Description: 
I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
 Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
 Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
 java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
 Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.*;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
 import org.apache.storm.tuple.Fields;

import java.util.Properties;

 

public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties)

{

Properties kafkaProperties = getProperties("/kafka.properties");

TridentTopology topology = new TridentTopology();

Fields stageArguments = new Fields("test", "issue");

KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test") 
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) 
.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)

.setRecordTranslator(new RecordTranslator(), stageArguments)

.build();

// If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.

// I have used a custom record translator which is working fine.

KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);

Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");

final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);

TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), 
stageArguments).parallelismHint(1);

Stream viewUpdate = 
ruleUpdate.newValuesStream().name("viewUpdate").partition(partitionGroup).each(stageArguments,
 new UpdateView(), new Fields()).parallelismHint(2); return topology.build(); }

public static void main(String[] args) {
 Config conf = new Config();
 

[jira] [Created] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-04-27 Thread Kush Khandelwal (JIRA)
Kush Khandelwal created STORM-3046:
--

 Summary: Getting a NPE leading worker to die when starting a 
topology.
 Key: STORM-3046
 URL: https://issues.apache.org/jira/browse/STORM-3046
 Project: Apache Storm
  Issue Type: Bug
  Components: rm-kafka-client, trident
Affects Versions: 1.2.1
Reporter: Kush Khandelwal
 Attachments: TestTopology.java

I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
version 1.1.0.

We have an external kafka from where we get the messages.
Whenever I try to run the topology, I get a NPE, which leads to the worker 
getting died.
Can someone please help me fix the issue?

Thanks.

 

Error - 

10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
o.a.s.util - Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
 ~[storm-core-1.2.1.jar:1.2.1]
 at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
[storm-core-1.2.1.jar:1.2.1]
 at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.NullPointerException
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
 ~[storm-kafka-client-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
 ~[storm-core-1.2.1.jar:1.2.1]
 at 
org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
 ~[storm-core-1.2.1.jar:1.2.1]
 ... 6 more

 

 

Topology class - 

 

 

 

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.*;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.tuple.Fields;

import java.util.Properties;

 


public class TestTopology {

 

private static StormTopology buildTopology(Properties stormProperties) {
 Properties kafkaProperties = getProperties("/kafka.properties");
 TridentTopology topology = new TridentTopology();

Fields stageArguments = new Fields("test", "issue");

KafkaSpoutConfig kafkaSpoutConfig = 
KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
"test")
 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
 .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
 .setRecordTranslator(new RecordTranslator(), stageArguments)
 .build();

// If I set poll strategy to earliest and the topic already contains some 
messages, it works fine.
 // I have used a custom record translator which is working fine.

KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
KafkaTridentSpoutOpaque(kafkaSpoutConfig);

Grouping partitionGroup = getPartitionGroup("test");

log.info("Creating Opaque-Trident-Kafka-Spout");

final Stream kafkaSpout = 
topology.newStream(stormProperties.getProperty("SPOUT_NAME"), 
kafkaTridentSpoutOpaque).name("kafkaSpout").parallelismHint(1);
 TridentState testUpdate = 
kafkaSpout.partition(partitionGroup).name("testUpdate").partitionPersist(new 
MainMemoryStateFactory(), stageArguments, new MainMemoryStateUpdater(), 
stageArguments).parallelismHint(1);
 Stream viewUpdate =