[jira] [Created] (STORM-3055) never refresh connection

2018-05-01 Thread zhangbiao (JIRA)
zhangbiao created STORM-3055:


 Summary: never refresh connection
 Key: STORM-3055
 URL: https://issues.apache.org/jira/browse/STORM-3055
 Project: Apache Storm
  Issue Type: Bug
  Components: storm-core
Affects Versions: 1.1.1
Reporter: zhangbiao


in our enviroment some worker's connection to other worker being closed and 
never reconnect,

the log show's that 

2018-05-02 10:28:49.302 o.a.s.m.n.Client 
Thread-90-disruptor-worker-transfer-queue [ERROR] discarding 1 messages because 
the Netty client to Netty-Client-/192.168.31.1:6800 is being closed

..
2018-05-02 11:00:29.540 o.a.s.m.n.Client 
Thread-90-disruptor-worker-transfer-queue [ERROR] discarding 1 messages because 
the Netty client to Netty-Client-/192.168.31.1:6800 is being closed

the log shows that it never can reconnect again. i can only fix it after 
restart the topo, 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (STORM-2985) Add jackson-annotations to dependency management

2018-05-01 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-2985.
---
Resolution: Fixed

> Add jackson-annotations to dependency management
> 
>
> Key: STORM-2985
> URL: https://issues.apache.org/jira/browse/STORM-2985
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.1.2, 1.0.6, 1.2.1
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.1.3, 1.0.7, 1.2.2
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
>  
> We recently upgraded to jackson version 2.9.4. However different versions of 
> jackson-annotation dependencies are inherited via transitive dependencies of 
> other jars. Its best to keep it in sync.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-2985) Add jackson-annotations to dependency management

2018-05-01 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-2985:
--
Fix Version/s: 1.0.7
   1.1.3

> Add jackson-annotations to dependency management
> 
>
> Key: STORM-2985
> URL: https://issues.apache.org/jira/browse/STORM-2985
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.1.2, 1.0.6, 1.2.1
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.1.3, 1.0.7, 1.2.2
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
>  
> We recently upgraded to jackson version 2.9.4. However different versions of 
> jackson-annotation dependencies are inherited via transitive dependencies of 
> other jars. Its best to keep it in sync.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-2985) Add jackson-annotations to dependency management

2018-05-01 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-2985:
--
Affects Version/s: 1.1.2
   1.0.6
   1.2.1

> Add jackson-annotations to dependency management
> 
>
> Key: STORM-2985
> URL: https://issues.apache.org/jira/browse/STORM-2985
> Project: Apache Storm
>  Issue Type: Bug
>Affects Versions: 1.1.2, 1.0.6, 1.2.1
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.1.3, 1.0.7, 1.2.2
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
>  
> We recently upgraded to jackson version 2.9.4. However different versions of 
> jackson-annotation dependencies are inherited via transitive dependencies of 
> other jars. Its best to keep it in sync.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3047) Ensure Trident emitter refreshPartitions is only called with partitions assigned to the emitter

2018-05-01 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing updated STORM-3047:
--
Description: 
This is a backport of the changes made to OpaquePartitionedTridentSpoutExecutor 
in https://github.com/apache/storm/pull/2300/files.

The description of the issue is copied here for convenience:

The changes in https://github.com/apache/storm/pull/2009 released in 1.1.0 made 
some changes to the OpaquePartitionedTridentSpoutExecutor that likely broke 
IOpaquePartitionedTridentSpout implementations other than storm-kafka-client. 
The changed code used to request sorted partitions from the spout via 
getOrderedPartitions, do a round-robin partitioning, and assign partitions via 
refreshPartitions 
https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100.
 The new code just passes the output of getOrderedPartitions into 
refreshPartitions 
https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120.
 It looks to me like refreshPartitions is passed the list of all partitions 
assigned to any spout task, rather than just the partitions assigned to the 
current task.

The proposed fix will use getOrderedPartitions to get the sorted partitions 
list, pass the list into getPartitionsForTask, and pass the resulting list of 
assigned partitions back into refreshPartitions.


  was:This is a backport of the changes made to 
OpaquePartitionedTridentSpoutExecutor in 
https://github.com/apache/storm/pull/2300/files.


> Ensure Trident emitter refreshPartitions is only called with partitions 
> assigned to the emitter
> ---
>
> Key: STORM-3047
> URL: https://issues.apache.org/jira/browse/STORM-3047
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-core
>Affects Versions: 1.1.2, 1.2.1
>Reporter: Stig Rohde Døssing
>Assignee: Stig Rohde Døssing
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This is a backport of the changes made to 
> OpaquePartitionedTridentSpoutExecutor in 
> https://github.com/apache/storm/pull/2300/files.
> The description of the issue is copied here for convenience:
> The changes in https://github.com/apache/storm/pull/2009 released in 1.1.0 
> made some changes to the OpaquePartitionedTridentSpoutExecutor that likely 
> broke IOpaquePartitionedTridentSpout implementations other than 
> storm-kafka-client. The changed code used to request sorted partitions from 
> the spout via getOrderedPartitions, do a round-robin partitioning, and assign 
> partitions via refreshPartitions 
> https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100.
>  The new code just passes the output of getOrderedPartitions into 
> refreshPartitions 
> https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120.
>  It looks to me like refreshPartitions is passed the list of all partitions 
> assigned to any spout task, rather than just the partitions assigned to the 
> current task.
> The proposed fix will use getOrderedPartitions to get the sorted partitions 
> list, pass the list into getPartitionsForTask, and pass the resulting list of 
> assigned partitions back into refreshPartitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout

2018-05-01 Thread JIRA

[ 
https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459657#comment-16459657
 ] 

Stig Rohde Døssing commented on STORM-2915:
---

The spout supports allowing you to implement your own failure handing, by 
letting you supply an implementation of this interface 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java#L29.
 The default implementation retries a configurable number of times, with a 
configurable backoff between retries 
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java.

Take a look at the schedule method in that class, it might do what you need. If 
not, please let us know what feature is missing.

Regarding the failure reason, as I mentioned that isn't supported at this time. 
You can work around it though, since tuple failures are either due to 
processing timeouts/lost messages, a thrown exception from a bolt causing the 
worker to crash, or because a bolt failed the message explicitly. You can guard 
against exceptions in your bolts, e.g. by dropping tuples to log (or some dead 
letter queue) if they cause an unexpected exception, and acking them so they 
are not reprocessed. Yo can also explicitly log from your bolt when you fail a 
tuple with OutputCollector.fail. That would let you see the reason for failures 
in your logs.

> How could I to get the fail Number   in Bolt When I use  Kafka Spout
> 
>
> Key: STORM-2915
> URL: https://issues.apache.org/jira/browse/STORM-2915
> Project: Apache Storm
>  Issue Type: New Feature
>  Components: storm-kafka-client
>Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5
>Reporter: Gergo Hong
>Priority: Minor
>
> I want to get fail num in bolt , how could  I  to get it? 
> if  fail it  retry, I see This 
> if (!isScheduled || retryService.isReady(msgId)) {
>  final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) 
> tuple).getStream() : Utils.DEFAULT_STREAM_ID;
>  if (!isAtLeastOnceProcessing()) {
>  if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
>  collector.emit(stream, tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  } else {
>  collector.emit(stream, tuple);
>  LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>  }
>  } else {
>  emitted.add(msgId);
>  offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
>  if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from 
> schedule.
>  retryService.remove(msgId);
>  }
>  collector.emit(stream, tuple, msgId);
>  tupleListener.onEmit(tuple, msgId);
>  LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, 
> record, msgId);
>  }
>  return true;
> }



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-05-01 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated STORM-3046:
--
Labels: kafka pull-request-available storm-kafka-client trident  (was: 
kafka storm-kafka-client trident)

> Getting a NPE leading worker to die when starting a topology.
> -
>
> Key: STORM-3046
> URL: https://issues.apache.org/jira/browse/STORM-3046
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client, trident
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Assignee: Stig Rohde Døssing
>Priority: Blocker
>  Labels: kafka, pull-request-available, storm-kafka-client, 
> trident
> Attachments: TestTopology.java
>
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
> version 1.1.0.
> We have an external kafka from where we get the messages.
>  Whenever I try to run the topology, I get a NPE, which leads to the worker 
> getting died.
> If I set poll strategy to earliest and the topic already contains some 
> messages, it works fine.
>  I have used a custom record translator which is working fine.
>  Can someone please help me fix the issue?
> Thanks.
>  
> Error - 
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
> o.a.s.util - Async loop died!
>  java.lang.RuntimeException: java.lang.NullPointerException
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
>  Caused by: java.lang.NullPointerException
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  ... 6 more
>  
>  
> Topology class - 
>  
>  
>  
>  
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.*;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
> import org.apache.storm.trident.Stream;
> import org.apache.storm.trident.TridentState;
> import org.apache.storm.trident.TridentTopology;
> import org.apache.storm.tuple.Fields;
> import java.util.Properties;
>  
> public class TestTopology {
>  
> private static StormTopology buildTopology(Properties stormProperties) {
>  
> Properties kafkaProperties = getProperties("/kafka.properties");
>  TridentTopology topology = new TridentTopology();
> Fields stageArguments = new Fields("test", "issue");
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> "test")
>  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
>  .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
>  .setRecordTranslator(new RecordTranslator(), 

[jira] [Assigned] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-05-01 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing reassigned STORM-3046:
-

Assignee: Stig Rohde Døssing

> Getting a NPE leading worker to die when starting a topology.
> -
>
> Key: STORM-3046
> URL: https://issues.apache.org/jira/browse/STORM-3046
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client, trident
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Assignee: Stig Rohde Døssing
>Priority: Blocker
>  Labels: kafka, storm-kafka-client, trident
> Attachments: TestTopology.java
>
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
> version 1.1.0.
> We have an external kafka from where we get the messages.
>  Whenever I try to run the topology, I get a NPE, which leads to the worker 
> getting died.
> If I set poll strategy to earliest and the topic already contains some 
> messages, it works fine.
>  I have used a custom record translator which is working fine.
>  Can someone please help me fix the issue?
> Thanks.
>  
> Error - 
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
> o.a.s.util - Async loop died!
>  java.lang.RuntimeException: java.lang.NullPointerException
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
>  Caused by: java.lang.NullPointerException
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  ... 6 more
>  
>  
> Topology class - 
>  
>  
>  
>  
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.*;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
> import org.apache.storm.trident.Stream;
> import org.apache.storm.trident.TridentState;
> import org.apache.storm.trident.TridentTopology;
> import org.apache.storm.tuple.Fields;
> import java.util.Properties;
>  
> public class TestTopology {
>  
> private static StormTopology buildTopology(Properties stormProperties) {
>  
> Properties kafkaProperties = getProperties("/kafka.properties");
>  TridentTopology topology = new TridentTopology();
> Fields stageArguments = new Fields("test", "issue");
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> "test")
>  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
>  .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
>  .setRecordTranslator(new RecordTranslator(), stageArguments)
>  .build();
> KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new 
> 

[jira] [Commented] (STORM-3046) Getting a NPE leading worker to die when starting a topology.

2018-05-01 Thread moroseking (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459584#comment-16459584
 ] 

moroseking commented on STORM-3046:
---

if you use opaquetrident, you must change some code that handle the condition 
when the lastbatch is null

> Getting a NPE leading worker to die when starting a topology.
> -
>
> Key: STORM-3046
> URL: https://issues.apache.org/jira/browse/STORM-3046
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka-client, trident
>Affects Versions: 1.2.1
>Reporter: Kush Khandelwal
>Priority: Blocker
>  Labels: kafka, storm-kafka-client, trident
> Attachments: TestTopology.java
>
>
> I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients 
> version 1.1.0.
> We have an external kafka from where we get the messages.
>  Whenever I try to run the topology, I get a NPE, which leads to the worker 
> getting died.
> If I set poll strategy to earliest and the topic already contains some 
> messages, it works fine.
>  I have used a custom record translator which is working fine.
>  Can someone please help me fix the issue?
> Thanks.
>  
> Error - 
> 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR 
> o.a.s.util - Async loop died!
>  java.lang.RuntimeException: java.lang.NullPointerException
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
> [storm-core-1.2.1.jar:1.2.1]
>  at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
>  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
>  Caused by: java.lang.NullPointerException
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
>  ~[storm-kafka-client-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  at 
> org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
>  ~[storm-core-1.2.1.jar:1.2.1]
>  ... 6 more
>  
>  
> Topology class - 
>  
>  
>  
>  
> import org.apache.storm.Config;
> import org.apache.storm.LocalCluster;
> import org.apache.storm.StormSubmitter;
> import org.apache.storm.generated.*;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
> import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
> import org.apache.storm.trident.Stream;
> import org.apache.storm.trident.TridentState;
> import org.apache.storm.trident.TridentTopology;
> import org.apache.storm.tuple.Fields;
> import java.util.Properties;
>  
> public class TestTopology {
>  
> private static StormTopology buildTopology(Properties stormProperties) {
>  
> Properties kafkaProperties = getProperties("/kafka.properties");
>  TridentTopology topology = new TridentTopology();
> Fields stageArguments = new Fields("test", "issue");
> KafkaSpoutConfig kafkaSpoutConfig = 
> KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), 
> "test")
>  .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
>  .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
>  .setRecordTranslator(new RecordTranslator(), stageArguments)
>  .build();
>