[jira] [Created] (STORM-3055) never refresh connection
zhangbiao created STORM-3055: Summary: never refresh connection Key: STORM-3055 URL: https://issues.apache.org/jira/browse/STORM-3055 Project: Apache Storm Issue Type: Bug Components: storm-core Affects Versions: 1.1.1 Reporter: zhangbiao in our enviroment some worker's connection to other worker being closed and never reconnect, the log show's that 2018-05-02 10:28:49.302 o.a.s.m.n.Client Thread-90-disruptor-worker-transfer-queue [ERROR] discarding 1 messages because the Netty client to Netty-Client-/192.168.31.1:6800 is being closed .. 2018-05-02 11:00:29.540 o.a.s.m.n.Client Thread-90-disruptor-worker-transfer-queue [ERROR] discarding 1 messages because the Netty client to Netty-Client-/192.168.31.1:6800 is being closed the log shows that it never can reconnect again. i can only fix it after restart the topo, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (STORM-2985) Add jackson-annotations to dependency management
[ https://issues.apache.org/jira/browse/STORM-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing resolved STORM-2985. --- Resolution: Fixed > Add jackson-annotations to dependency management > > > Key: STORM-2985 > URL: https://issues.apache.org/jira/browse/STORM-2985 > Project: Apache Storm > Issue Type: Bug >Affects Versions: 1.1.2, 1.0.6, 1.2.1 >Reporter: Arun Mahadevan >Assignee: Arun Mahadevan >Priority: Major > Labels: pull-request-available > Fix For: 1.1.3, 1.0.7, 1.2.2 > > Time Spent: 40m > Remaining Estimate: 0h > > > We recently upgraded to jackson version 2.9.4. However different versions of > jackson-annotation dependencies are inherited via transitive dependencies of > other jars. Its best to keep it in sync. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-2985) Add jackson-annotations to dependency management
[ https://issues.apache.org/jira/browse/STORM-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-2985: -- Fix Version/s: 1.0.7 1.1.3 > Add jackson-annotations to dependency management > > > Key: STORM-2985 > URL: https://issues.apache.org/jira/browse/STORM-2985 > Project: Apache Storm > Issue Type: Bug >Affects Versions: 1.1.2, 1.0.6, 1.2.1 >Reporter: Arun Mahadevan >Assignee: Arun Mahadevan >Priority: Major > Labels: pull-request-available > Fix For: 1.1.3, 1.0.7, 1.2.2 > > Time Spent: 40m > Remaining Estimate: 0h > > > We recently upgraded to jackson version 2.9.4. However different versions of > jackson-annotation dependencies are inherited via transitive dependencies of > other jars. Its best to keep it in sync. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-2985) Add jackson-annotations to dependency management
[ https://issues.apache.org/jira/browse/STORM-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-2985: -- Affects Version/s: 1.1.2 1.0.6 1.2.1 > Add jackson-annotations to dependency management > > > Key: STORM-2985 > URL: https://issues.apache.org/jira/browse/STORM-2985 > Project: Apache Storm > Issue Type: Bug >Affects Versions: 1.1.2, 1.0.6, 1.2.1 >Reporter: Arun Mahadevan >Assignee: Arun Mahadevan >Priority: Major > Labels: pull-request-available > Fix For: 1.1.3, 1.0.7, 1.2.2 > > Time Spent: 40m > Remaining Estimate: 0h > > > We recently upgraded to jackson version 2.9.4. However different versions of > jackson-annotation dependencies are inherited via transitive dependencies of > other jars. Its best to keep it in sync. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3047) Ensure Trident emitter refreshPartitions is only called with partitions assigned to the emitter
[ https://issues.apache.org/jira/browse/STORM-3047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing updated STORM-3047: -- Description: This is a backport of the changes made to OpaquePartitionedTridentSpoutExecutor in https://github.com/apache/storm/pull/2300/files. The description of the issue is copied here for convenience: The changes in https://github.com/apache/storm/pull/2009 released in 1.1.0 made some changes to the OpaquePartitionedTridentSpoutExecutor that likely broke IOpaquePartitionedTridentSpout implementations other than storm-kafka-client. The changed code used to request sorted partitions from the spout via getOrderedPartitions, do a round-robin partitioning, and assign partitions via refreshPartitions https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100. The new code just passes the output of getOrderedPartitions into refreshPartitions https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120. It looks to me like refreshPartitions is passed the list of all partitions assigned to any spout task, rather than just the partitions assigned to the current task. The proposed fix will use getOrderedPartitions to get the sorted partitions list, pass the list into getPartitionsForTask, and pass the resulting list of assigned partitions back into refreshPartitions. was:This is a backport of the changes made to OpaquePartitionedTridentSpoutExecutor in https://github.com/apache/storm/pull/2300/files. > Ensure Trident emitter refreshPartitions is only called with partitions > assigned to the emitter > --- > > Key: STORM-3047 > URL: https://issues.apache.org/jira/browse/STORM-3047 > Project: Apache Storm > Issue Type: Bug > Components: storm-core >Affects Versions: 1.1.2, 1.2.1 >Reporter: Stig Rohde Døssing >Assignee: Stig Rohde Døssing >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > This is a backport of the changes made to > OpaquePartitionedTridentSpoutExecutor in > https://github.com/apache/storm/pull/2300/files. > The description of the issue is copied here for convenience: > The changes in https://github.com/apache/storm/pull/2009 released in 1.1.0 > made some changes to the OpaquePartitionedTridentSpoutExecutor that likely > broke IOpaquePartitionedTridentSpout implementations other than > storm-kafka-client. The changed code used to request sorted partitions from > the spout via getOrderedPartitions, do a round-robin partitioning, and assign > partitions via refreshPartitions > https://github.com/apache/storm/blob/v1.0.4/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L100. > The new code just passes the output of getOrderedPartitions into > refreshPartitions > https://github.com/apache/storm/blob/v1.1.0/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L120. > It looks to me like refreshPartitions is passed the list of all partitions > assigned to any spout task, rather than just the partitions assigned to the > current task. > The proposed fix will use getOrderedPartitions to get the sorted partitions > list, pass the list into getPartitionsForTask, and pass the resulting list of > assigned partitions back into refreshPartitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (STORM-2915) How could I to get the fail Number in Bolt When I use Kafka Spout
[ https://issues.apache.org/jira/browse/STORM-2915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459657#comment-16459657 ] Stig Rohde Døssing commented on STORM-2915: --- The spout supports allowing you to implement your own failure handing, by letting you supply an implementation of this interface https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java#L29. The default implementation retries a configurable number of times, with a configurable backoff between retries https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java. Take a look at the schedule method in that class, it might do what you need. If not, please let us know what feature is missing. Regarding the failure reason, as I mentioned that isn't supported at this time. You can work around it though, since tuple failures are either due to processing timeouts/lost messages, a thrown exception from a bolt causing the worker to crash, or because a bolt failed the message explicitly. You can guard against exceptions in your bolts, e.g. by dropping tuples to log (or some dead letter queue) if they cause an unexpected exception, and acking them so they are not reprocessed. Yo can also explicitly log from your bolt when you fail a tuple with OutputCollector.fail. That would let you see the reason for failures in your logs. > How could I to get the fail Number in Bolt When I use Kafka Spout > > > Key: STORM-2915 > URL: https://issues.apache.org/jira/browse/STORM-2915 > Project: Apache Storm > Issue Type: New Feature > Components: storm-kafka-client >Affects Versions: 1.0.2, 1.1.0, 1.0.3, 1.0.4, 1.1.1, 1.0.5 >Reporter: Gergo Hong >Priority: Minor > > I want to get fail num in bolt , how could I to get it? > if fail it retry, I see This > if (!isScheduled || retryService.isReady(msgId)) { > final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) > tuple).getStream() : Utils.DEFAULT_STREAM_ID; > if (!isAtLeastOnceProcessing()) { > if (kafkaSpoutConfig.isTupleTrackingEnforced()) { > collector.emit(stream, tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } else { > collector.emit(stream, tuple); > LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record); > } > } else { > emitted.add(msgId); > offsetManagers.get(tp).addToEmitMsgs(msgId.offset()); > if (isScheduled) { // Was scheduled for retry and re-emitted, so remove from > schedule. > retryService.remove(msgId); > } > collector.emit(stream, tuple, msgId); > tupleListener.onEmit(tuple, msgId); > LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, > record, msgId); > } > return true; > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated STORM-3046: -- Labels: kafka pull-request-available storm-kafka-client trident (was: kafka storm-kafka-client trident) > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Assignee: Stig Rohde Døssing >Priority: Blocker > Labels: kafka, pull-request-available, storm-kafka-client, > trident > Attachments: TestTopology.java > > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.*; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; > import org.apache.storm.trident.Stream; > import org.apache.storm.trident.TridentState; > import org.apache.storm.trident.TridentTopology; > import org.apache.storm.tuple.Fields; > import java.util.Properties; > > public class TestTopology { > > private static StormTopology buildTopology(Properties stormProperties) { > > Properties kafkaProperties = getProperties("/kafka.properties"); > TridentTopology topology = new TridentTopology(); > Fields stageArguments = new Fields("test", "issue"); > KafkaSpoutConfigkafkaSpoutConfig = > KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), > "test") > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) > .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) > .setRecordTranslator(new RecordTranslator(),
[jira] [Assigned] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stig Rohde Døssing reassigned STORM-3046: - Assignee: Stig Rohde Døssing > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Assignee: Stig Rohde Døssing >Priority: Blocker > Labels: kafka, storm-kafka-client, trident > Attachments: TestTopology.java > > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.*; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; > import org.apache.storm.trident.Stream; > import org.apache.storm.trident.TridentState; > import org.apache.storm.trident.TridentTopology; > import org.apache.storm.tuple.Fields; > import java.util.Properties; > > public class TestTopology { > > private static StormTopology buildTopology(Properties stormProperties) { > > Properties kafkaProperties = getProperties("/kafka.properties"); > TridentTopology topology = new TridentTopology(); > Fields stageArguments = new Fields("test", "issue"); > KafkaSpoutConfigkafkaSpoutConfig = > KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), > "test") > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) > .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) > .setRecordTranslator(new RecordTranslator(), stageArguments) > .build(); > KafkaTridentSpoutOpaque kafkaTridentSpoutOpaque = new >
[jira] [Commented] (STORM-3046) Getting a NPE leading worker to die when starting a topology.
[ https://issues.apache.org/jira/browse/STORM-3046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16459584#comment-16459584 ] moroseking commented on STORM-3046: --- if you use opaquetrident, you must change some code that handle the condition when the lastbatch is null > Getting a NPE leading worker to die when starting a topology. > - > > Key: STORM-3046 > URL: https://issues.apache.org/jira/browse/STORM-3046 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka-client, trident >Affects Versions: 1.2.1 >Reporter: Kush Khandelwal >Priority: Blocker > Labels: kafka, storm-kafka-client, trident > Attachments: TestTopology.java > > > I am using storm-core and storm-kafka-client version 1.2.1 and kafka clients > version 1.1.0. > We have an external kafka from where we get the messages. > Whenever I try to run the topology, I get a NPE, which leads to the worker > getting died. > If I set poll strategy to earliest and the topic already contains some > messages, it works fine. > I have used a custom record translator which is working fine. > Can someone please help me fix the issue? > Thanks. > > Error - > 10665 [Thread-58-spout-handle-rule-local-kafka-spout-executor[26 26]] ERROR > o.a.s.util - Async loop died! > java.lang.RuntimeException: java.lang.NullPointerException > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) > ~[storm-core-1.2.1.jar:1.2.1] > at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) > [storm-core-1.2.1.jar:1.2.1] > at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131] > Caused by: java.lang.NullPointerException > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) > ~[storm-kafka-client-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) > ~[storm-core-1.2.1.jar:1.2.1] > at > org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) > ~[storm-core-1.2.1.jar:1.2.1] > ... 6 more > > > Topology class - > > > > > import org.apache.storm.Config; > import org.apache.storm.LocalCluster; > import org.apache.storm.StormSubmitter; > import org.apache.storm.generated.*; > import org.apache.storm.kafka.spout.KafkaSpoutConfig; > import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque; > import org.apache.storm.trident.Stream; > import org.apache.storm.trident.TridentState; > import org.apache.storm.trident.TridentTopology; > import org.apache.storm.tuple.Fields; > import java.util.Properties; > > public class TestTopology { > > private static StormTopology buildTopology(Properties stormProperties) { > > Properties kafkaProperties = getProperties("/kafka.properties"); > TridentTopology topology = new TridentTopology(); > Fields stageArguments = new Fields("test", "issue"); > KafkaSpoutConfigkafkaSpoutConfig = > KafkaSpoutConfig.builder(kafkaProperties.getProperty("bootstrap.servers"), > "test") > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) > .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) > .setRecordTranslator(new RecordTranslator(), stageArguments) > .build(); >