It does not got reproduced yesterday as well, when I was doing my runs. I see last release of storm-kafka-client in May-18, It will be great if we can release a new version of it as I see there are many commits after this.
Appreciating your efforts to provide a fix for it, Thanks a lot :) ` Best Regards Saurabh Kumar Mimani On Thu, Dec 20, 2018 at 9:17 AM saurabh mimani <[email protected]> wrote: > Thanks for the info, I have tried few runs with the code you mentioned > here > <https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473.>, > putting that back in 1.x-branch, I did not got this exception so far, I > will run it few more times to be sure and let you know. > > > > > Best Regards > > Saurabh Kumar Mimani > > > > > On Thu, Dec 13, 2018 at 1:42 PM Stig Rohde Døssing <[email protected]> > wrote: > >> Sounds good. Keep in mind that the branch I linked doesn't perform the >> check that was throwing the exception at all. If you want to be sure that >> the bug is gone, you might want to take the branch and copy in the check >> https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473 >> . >> >> We removed it because it was causing performance issues with newer Kafka >> versions. If the bug is still there, you won't get the exception anymore, >> but the spout might emit already processed tuples. Let me know if you'd >> like to test the spout with the check still in place, and I'll make a >> branch that still has the check. >> >> I don't have a date for 1.2.3, but it could probably be pretty soon. If >> you'd like to test the spout with the exception check still in place, it >> would be good to do first, but otherwise we should be able to start the >> release process soon. >> >> Den tor. 13. dec. 2018 kl. 07.51 skrev saurabh mimani < >> [email protected]>: >> >>> Hey Stig, >>> >>> Thanks for looking into this and providing a fix. We have build the >>> storm-kafka-client jar from your branch and it is working fine on that. We >>> are still testing it and will let you know if there are any issues we fix. >>> >>> I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>, >>> you have put the fix version as 1.2.3, any approximate date when will this >>> be released? >>> >>> Thanks for your help :) >>> >>> >>> >>> >>> >>> Best Regards >>> >>> Saurabh Kumar Mimani >>> >>> >>> >>> >>> On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing < >>> [email protected]> wrote: >>> >>>> I'm assuming you applied the fix on top of 1.2.1 or something like >>>> that? The exception can't be thrown from the branch I linked, since it was >>>> removed in an earlier commit in 1.x-branch. >>>> >>>> Your logs show that the committed offset for partition 6 is 1682098 (98 >>>> for short). 98 was emitted, since it shows up in the emitted list. I'm >>>> guessing it failed and was replayed. 99 and up are in the acked list, so >>>> they are ready to commit as soon as 98 finishes processing. >>>> >>>> The log shows that 99 is the tuple encountering the exception, so I'm >>>> guessing what happened is that 98 was acked and the spout decided to commit >>>> 98, 99 etc. For some reason it then still decides to emit 99. The only >>>> reasons I can think of (barring bugs in Kafka/the Kafka client) for that to >>>> happen would be that 99 is in waitingToEmit and isn't being cleared out >>>> during the commit (this is the bug I tried to fix), somehow 99 is still >>>> queued for retry (this should not be possible) or for some reason the >>>> consumer position ends up below the committed offset. I think the best bet >>>> for tracking down why it happens would be logging the contents of the >>>> RetryService, the contents of waitingToEmit and the consumer position both >>>> after commitOffsetsForAckedTuples, and right before the exception is >>>> thrown. >>>> >>>> Could you try logging those? I can add the log statements on top of the >>>> bugfix if needed. >>>> >>>> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani < >>>> [email protected]>: >>>> >>>>> Hey, I see this is still happening, this time, it seems, as it seemed >>>>> to me, because same offset from different partition was committed(guessing >>>>> from logs), but not sure as that should be handled. >>>>> >>>>> Please find the logs here >>>>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>. >>>>> >>>>> >>>>> >>>>> Best Regards >>>>> >>>>> Saurabh Kumar Mimani >>>>> >>>>> >>>>> >>>>> >>>>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing < >>>>> [email protected]> wrote: >>>>> >>>>>> I believe I have a fix, your logs were helpful. Please try out the >>>>>> changes in https://github.com/apache/storm/pull/2923/files. >>>>>> >>>>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani < >>>>>> [email protected]>: >>>>>> >>>>>>> Hey, >>>>>>> >>>>>>> Thanks for looking into this. I was not able to produce this earlier >>>>>>> on my local, however I will again try once. I was consistently able to >>>>>>> reproduce it with parallelism of 5 for boltA and parallelism of 200 with >>>>>>> boltB on 2 machines in cluster mode. >>>>>>> >>>>>>> I will try again with your code once. >>>>>>> >>>>>>> These >>>>>>> <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are >>>>>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode >>>>>>> with >>>>>>> my topology, in case these helps. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Best Regards >>>>>>> >>>>>>> Saurabh Kumar Mimani >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I can't reproduce this. >>>>>>>> >>>>>>>> I created a test topology similar to the code you posted, based on >>>>>>>> the 1.2.1 release tag >>>>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564 >>>>>>>> . >>>>>>>> >>>>>>>> I set up a local Kafka instance and put enough messages to run the >>>>>>>> topology for 15 minutes or so in the test topic. After populating the >>>>>>>> topic, I started the topology and let it run until it reached the end >>>>>>>> of >>>>>>>> the topic. As expected a lot of messages failed, but after a while it >>>>>>>> managed to successfully process all messages. I didn't see any worker >>>>>>>> crashes, and the logs only show some errors related to moving files >>>>>>>> (expected on Windows). >>>>>>>> >>>>>>>> The topology seems to work fine against both Kafka 0.10.2.2 and >>>>>>>> 1.1.1, though 1.1.1 is slower due to >>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. >>>>>>>> >>>>>>>> The Kafka setup was with the default configuration for both Kafka >>>>>>>> and Zookeeper, and Storm was set up with a local Nimbus, single local >>>>>>>> Supervisor and 4 worker slots. >>>>>>>> >>>>>>>> Saurabh please try to reproduce the issue using the topology I >>>>>>>> linked. If you need to make adjustments in order to provoke the issue, >>>>>>>> please update the code and link it so I can check it out and try it. >>>>>>>> >>>>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < >>>>>>>> [email protected]>: >>>>>>>> >>>>>>>>> No, I have checked that, there is no other consumer group >>>>>>>>> consuming from the same. >>>>>>>>> >>>>>>>>> Thanks for looking into it, let me know if you need any >>>>>>>>> other information. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Saurabh Kumar Mimani >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples >>>>>>>>>> to the input tuple. It's intended for bolts that just need to >>>>>>>>>> receive a >>>>>>>>>> tuple, synchronously process it and emit some new tuples anchored to >>>>>>>>>> the >>>>>>>>>> input tuple. It's there because doing manual acking is tedious and >>>>>>>>>> error-prone in cases where you don't need to be able to e.g. emit new >>>>>>>>>> unachored tuples or ack the input tuple asynchronously. As Peter >>>>>>>>>> mentioned, >>>>>>>>>> the BasicBoltExecutor ( >>>>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42) >>>>>>>>>> handles acking for you. >>>>>>>>>> >>>>>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also >>>>>>>>>> check that you don't have any other consumers using the same >>>>>>>>>> consumer group >>>>>>>>>> as the spout. >>>>>>>>>> >>>>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>>>>>>>>> [email protected]>: >>>>>>>>>> >>>>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor >>>>>>>>>>> (an implmentation of IRichBolt), which calls >>>>>>>>>>> collector.setContext(input), >>>>>>>>>>> and also does the acking inside it's execute function, and in >>>>>>>>>>> between calls >>>>>>>>>>> the BaseBasicBolt.execute version (which takes the collector as >>>>>>>>>>> well as the >>>>>>>>>>> tuple as parameters). >>>>>>>>>>> So the intention is clearly that it is automatically anchored >>>>>>>>>>> and acknowledged. >>>>>>>>>>> >>>>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't >>>>>>>>>>>> do much. >>>>>>>>>>>> Also checked BasicOutputCollector and don't see how it will >>>>>>>>>>>> anchor automatically unless you call >>>>>>>>>>>> BasicOutputCollector.setContext(Tuple tuple), don't see all of >>>>>>>>>>>> your code, >>>>>>>>>>>> but don't see this call in your boltA code. >>>>>>>>>>>> Also it looks like even when you make this setContext call, >>>>>>>>>>>> after that you will have to emit using following emit function >>>>>>>>>>>> >>>>>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples) >>>>>>>>>>>> >>>>>>>>>>>> Basically just check that whatever emit function is called, it >>>>>>>>>>>> does pass the input tuple. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Once I had exactly same issue for few days, but mine was >>>>>>>>>>>> related to config. I wanted to read from two Kafka topics in one >>>>>>>>>>>> topology >>>>>>>>>>>> and had two different kafkaspout created, mistakenly I copy pasted >>>>>>>>>>>> same >>>>>>>>>>>> config and that caused this issue. Not sure if that applies to your >>>>>>>>>>>> scenario. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> *NOTE*: I checked the latest storm master branch for code. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani < >>>>>>>>>>>> [email protected] wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hey Ravi, >>>>>>>>>>>>> >>>>>>>>>>>>> I am using *BaseBasicBolt*, which as described here >>>>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html> >>>>>>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically >>>>>>>>>>>>> anchored to the input tuple, and the input tuple is acked for you >>>>>>>>>>>>> automatically when the execute method completes. >>>>>>>>>>>>> >>>>>>>>>>>>> What you are saying is applicable for *BaseRichBolt. *The >>>>>>>>>>>>> Kafka spout I am using is from storm-kafka-client >>>>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1> >>>>>>>>>>>>> library, so unique ID, etc should be already taken care of. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>>> I think there is issue with part of code which is emitting >>>>>>>>>>>>>> the tuples. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If you want to use ack mechanism, you need to use anchor >>>>>>>>>>>>>> tuple when you emit from bolts. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Collector.emit(Tuple input, Values data) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Ravi >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani < >>>>>>>>>>>>>> [email protected] wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hey, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for your reply. What you are saying is correct, >>>>>>>>>>>>>>> However I am able to reproduce it more often and I think it >>>>>>>>>>>>>>> happens when >>>>>>>>>>>>>>> multiple tuples gets failed in first run but all of those gets >>>>>>>>>>>>>>> success on >>>>>>>>>>>>>>> retry, something of that sort is happening. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This can be reproduced using following two bolts and >>>>>>>>>>>>>>> kafkaSpout easily, by running in cluster more with 3/4 minutes: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *BoltA* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>>>>> val inp = input.getBinaryByField("value").getObj[someObj] >>>>>>>>>>>>>>> val randomGenerator = new Random() >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> var i = 0 >>>>>>>>>>>>>>> val rand = randomGenerator.nextBoolean() >>>>>>>>>>>>>>> 1 to 100 foreach { >>>>>>>>>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>>>>>>>>> i += 1 >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *BoltB* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>>>>> val abc = input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>>>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>>>>>>>>> if(abc.index >= 97 && abc.rand){ >>>>>>>>>>>>>>> println(s"throwing FailedException for ${abc.index}th >>>>>>>>>>>>>>> tuple for") >>>>>>>>>>>>>>> throw new FailedException() >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *KafkaSpout:* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> private def getKafkaSpoutConfig(source: Config) = >>>>>>>>>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", >>>>>>>>>>>>>>> "queueName") >>>>>>>>>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>>>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>>>>>>>>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>>>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>>>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>>>>> 10, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000) >>>>>>>>>>>>>>> )) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", >>>>>>>>>>>>>>> "UNCOMMITTED_EARLIEST"))) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", >>>>>>>>>>>>>>> 10000)) >>>>>>>>>>>>>>> .build() >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Other config: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> messageTimeoutInSecons: 300 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 >>>>>>>>>>>>>>>> tuples are acked. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The error you're seeing was added as part of >>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to >>>>>>>>>>>>>>>> avoid having that bug pop up again. Could you try posting your >>>>>>>>>>>>>>>> spout >>>>>>>>>>>>>>>> configuration? Also if possible, it would be helpful if you >>>>>>>>>>>>>>>> could enable >>>>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout >>>>>>>>>>>>>>>> and maybe also >>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.internal.OffsetManager. >>>>>>>>>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>>>>>>>>> ). It should be possible to track down when/why the consumer >>>>>>>>>>>>>>>> position wound >>>>>>>>>>>>>>>> up behind the committed offset. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Just so you're aware, the check that crashes the spout has >>>>>>>>>>>>>>>> been removed as of >>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. I'd >>>>>>>>>>>>>>>> still like to know if there's a bug in the spout causing it to >>>>>>>>>>>>>>>> emit tuples >>>>>>>>>>>>>>>> that were already committed though. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Version Info: >>>>>>>>>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> boltA just does some formatting of requests and emits >>>>>>>>>>>>>>>>> another tuple. boltB does some processing and emits >>>>>>>>>>>>>>>>> around 100 tuples for each tuple being received. boltC >>>>>>>>>>>>>>>>> and boltD processes these tuples. All the bolts >>>>>>>>>>>>>>>>> implements BaseBasicBolt. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as >>>>>>>>>>>>>>>>> fail and marks that for retry by throwing FailedException, >>>>>>>>>>>>>>>>> After a few minutes less than my topology timeout, I get the >>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>> error: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message >>>>>>>>>>>>>>>>> that has already been committed. This should never occur when >>>>>>>>>>>>>>>>> using the at-least-once processing guarantee. >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message >>>>>>>>>>>>>>>>> that has already been committed. This should never occur when >>>>>>>>>>>>>>>>> using the at-least-once processing guarantee. >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> What seems to be happening is this happens when boltB emits >>>>>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails one of the tuples out >>>>>>>>>>>>>>>>> of those 100 tuples, I am getting this error. Not able to >>>>>>>>>>>>>>>>> understand how to >>>>>>>>>>>>>>>>> fix this, ideally it should ack an original tuple when >>>>>>>>>>>>>>>>> all 100 tuples are acked, but probably an original tuple is >>>>>>>>>>>>>>>>> acked before >>>>>>>>>>>>>>>>> all those 100 tuples are acked, which causes this error. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>>>>>>>>> >>>>>>>>>>> T: +44(0)870 600 2311 >>>>>>>>>>> Connect with me: Email <[email protected]> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> [image: htk logo] <http://www.htk.co.uk/> >>>>>>>>>>> >>>>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn >>>>>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter >>>>>>>>>>> <http://www.twitter.com/htkhorizon> >>>>>>>>>>> >>>>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, >>>>>>>>>>> UK. >>>>>>>>>>> Company Registered in England and Wales as 3191677, VAT Number >>>>>>>>>>> 675 9467 71 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>>>>>>>>> This email is only for the use of the intended recipients and >>>>>>>>>>> may contain privileged information. If you’ve received this email >>>>>>>>>>> in error, >>>>>>>>>>> please let the sender know; then delete the message. The views >>>>>>>>>>> expressed in this email represent those of the sender and not >>>>>>>>>>> necessarily >>>>>>>>>>> of HTK. >>>>>>>>>>> >>>>>>>>>>
