Thanks for the info, I have tried few runs with the code you mentioned here <https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473.>, putting that back in 1.x-branch, I did not got this exception so far, I will run it few more times to be sure and let you know.
Best Regards Saurabh Kumar Mimani On Thu, Dec 13, 2018 at 1:42 PM Stig Rohde Døssing <[email protected]> wrote: > Sounds good. Keep in mind that the branch I linked doesn't perform the > check that was throwing the exception at all. If you want to be sure that > the bug is gone, you might want to take the branch and copy in the check > https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473 > . > > We removed it because it was causing performance issues with newer Kafka > versions. If the bug is still there, you won't get the exception anymore, > but the spout might emit already processed tuples. Let me know if you'd > like to test the spout with the check still in place, and I'll make a > branch that still has the check. > > I don't have a date for 1.2.3, but it could probably be pretty soon. If > you'd like to test the spout with the exception check still in place, it > would be good to do first, but otherwise we should be able to start the > release process soon. > > Den tor. 13. dec. 2018 kl. 07.51 skrev saurabh mimani < > [email protected]>: > >> Hey Stig, >> >> Thanks for looking into this and providing a fix. We have build the >> storm-kafka-client jar from your branch and it is working fine on that. We >> are still testing it and will let you know if there are any issues we fix. >> >> I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>, >> you have put the fix version as 1.2.3, any approximate date when will this >> be released? >> >> Thanks for your help :) >> >> >> >> >> >> Best Regards >> >> Saurabh Kumar Mimani >> >> >> >> >> On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing < >> [email protected]> wrote: >> >>> I'm assuming you applied the fix on top of 1.2.1 or something like that? >>> The exception can't be thrown from the branch I linked, since it was >>> removed in an earlier commit in 1.x-branch. >>> >>> Your logs show that the committed offset for partition 6 is 1682098 (98 >>> for short). 98 was emitted, since it shows up in the emitted list. I'm >>> guessing it failed and was replayed. 99 and up are in the acked list, so >>> they are ready to commit as soon as 98 finishes processing. >>> >>> The log shows that 99 is the tuple encountering the exception, so I'm >>> guessing what happened is that 98 was acked and the spout decided to commit >>> 98, 99 etc. For some reason it then still decides to emit 99. The only >>> reasons I can think of (barring bugs in Kafka/the Kafka client) for that to >>> happen would be that 99 is in waitingToEmit and isn't being cleared out >>> during the commit (this is the bug I tried to fix), somehow 99 is still >>> queued for retry (this should not be possible) or for some reason the >>> consumer position ends up below the committed offset. I think the best bet >>> for tracking down why it happens would be logging the contents of the >>> RetryService, the contents of waitingToEmit and the consumer position both >>> after commitOffsetsForAckedTuples, and right before the exception is >>> thrown. >>> >>> Could you try logging those? I can add the log statements on top of the >>> bugfix if needed. >>> >>> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani < >>> [email protected]>: >>> >>>> Hey, I see this is still happening, this time, it seems, as it seemed >>>> to me, because same offset from different partition was committed(guessing >>>> from logs), but not sure as that should be handled. >>>> >>>> Please find the logs here >>>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>. >>>> >>>> >>>> >>>> Best Regards >>>> >>>> Saurabh Kumar Mimani >>>> >>>> >>>> >>>> >>>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing < >>>> [email protected]> wrote: >>>> >>>>> I believe I have a fix, your logs were helpful. Please try out the >>>>> changes in https://github.com/apache/storm/pull/2923/files. >>>>> >>>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani < >>>>> [email protected]>: >>>>> >>>>>> Hey, >>>>>> >>>>>> Thanks for looking into this. I was not able to produce this earlier >>>>>> on my local, however I will again try once. I was consistently able to >>>>>> reproduce it with parallelism of 5 for boltA and parallelism of 200 with >>>>>> boltB on 2 machines in cluster mode. >>>>>> >>>>>> I will try again with your code once. >>>>>> >>>>>> These >>>>>> <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are >>>>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode with >>>>>> my topology, in case these helps. >>>>>> >>>>>> >>>>>> >>>>>> Best Regards >>>>>> >>>>>> Saurabh Kumar Mimani >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I can't reproduce this. >>>>>>> >>>>>>> I created a test topology similar to the code you posted, based on >>>>>>> the 1.2.1 release tag >>>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564 >>>>>>> . >>>>>>> >>>>>>> I set up a local Kafka instance and put enough messages to run the >>>>>>> topology for 15 minutes or so in the test topic. After populating the >>>>>>> topic, I started the topology and let it run until it reached the end of >>>>>>> the topic. As expected a lot of messages failed, but after a while it >>>>>>> managed to successfully process all messages. I didn't see any worker >>>>>>> crashes, and the logs only show some errors related to moving files >>>>>>> (expected on Windows). >>>>>>> >>>>>>> The topology seems to work fine against both Kafka 0.10.2.2 and >>>>>>> 1.1.1, though 1.1.1 is slower due to >>>>>>> https://issues.apache.org/jira/browse/STORM-3102. >>>>>>> >>>>>>> The Kafka setup was with the default configuration for both Kafka >>>>>>> and Zookeeper, and Storm was set up with a local Nimbus, single local >>>>>>> Supervisor and 4 worker slots. >>>>>>> >>>>>>> Saurabh please try to reproduce the issue using the topology I >>>>>>> linked. If you need to make adjustments in order to provoke the issue, >>>>>>> please update the code and link it so I can check it out and try it. >>>>>>> >>>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < >>>>>>> [email protected]>: >>>>>>> >>>>>>>> No, I have checked that, there is no other consumer group consuming >>>>>>>> from the same. >>>>>>>> >>>>>>>> Thanks for looking into it, let me know if you need any >>>>>>>> other information. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Saurabh Kumar Mimani >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples >>>>>>>>> to the input tuple. It's intended for bolts that just need to receive >>>>>>>>> a >>>>>>>>> tuple, synchronously process it and emit some new tuples anchored to >>>>>>>>> the >>>>>>>>> input tuple. It's there because doing manual acking is tedious and >>>>>>>>> error-prone in cases where you don't need to be able to e.g. emit new >>>>>>>>> unachored tuples or ack the input tuple asynchronously. As Peter >>>>>>>>> mentioned, >>>>>>>>> the BasicBoltExecutor ( >>>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42) >>>>>>>>> handles acking for you. >>>>>>>>> >>>>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also check >>>>>>>>> that you don't have any other consumers using the same consumer group >>>>>>>>> as >>>>>>>>> the spout. >>>>>>>>> >>>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>>>>>>>> [email protected]>: >>>>>>>>> >>>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an >>>>>>>>>> implmentation of IRichBolt), which calls >>>>>>>>>> collector.setContext(input), and >>>>>>>>>> also does the acking inside it's execute function, and in between >>>>>>>>>> calls the >>>>>>>>>> BaseBasicBolt.execute version (which takes the collector as well as >>>>>>>>>> the >>>>>>>>>> tuple as parameters). >>>>>>>>>> So the intention is clearly that it is automatically anchored and >>>>>>>>>> acknowledged. >>>>>>>>>> >>>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Saurabh, >>>>>>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't >>>>>>>>>>> do much. >>>>>>>>>>> Also checked BasicOutputCollector and don't see how it will >>>>>>>>>>> anchor automatically unless you call >>>>>>>>>>> BasicOutputCollector.setContext(Tuple tuple), don't see all of >>>>>>>>>>> your code, >>>>>>>>>>> but don't see this call in your boltA code. >>>>>>>>>>> Also it looks like even when you make this setContext call, >>>>>>>>>>> after that you will have to emit using following emit function >>>>>>>>>>> >>>>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples) >>>>>>>>>>> >>>>>>>>>>> Basically just check that whatever emit function is called, it >>>>>>>>>>> does pass the input tuple. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Once I had exactly same issue for few days, but mine was related >>>>>>>>>>> to config. I wanted to read from two Kafka topics in one topology >>>>>>>>>>> and had >>>>>>>>>>> two different kafkaspout created, mistakenly I copy pasted same >>>>>>>>>>> config and >>>>>>>>>>> that caused this issue. Not sure if that applies to your scenario. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> *NOTE*: I checked the latest storm master branch for code. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani < >>>>>>>>>>> [email protected] wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey Ravi, >>>>>>>>>>>> >>>>>>>>>>>> I am using *BaseBasicBolt*, which as described here >>>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html> >>>>>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically >>>>>>>>>>>> anchored to the input tuple, and the input tuple is acked for you >>>>>>>>>>>> automatically when the execute method completes. >>>>>>>>>>>> >>>>>>>>>>>> What you are saying is applicable for *BaseRichBolt. *The >>>>>>>>>>>> Kafka spout I am using is from storm-kafka-client >>>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1> >>>>>>>>>>>> library, so unique ID, etc should be already taken care of. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>> I think there is issue with part of code which is emitting the >>>>>>>>>>>>> tuples. >>>>>>>>>>>>> >>>>>>>>>>>>> If you want to use ack mechanism, you need to use anchor tuple >>>>>>>>>>>>> when you emit from bolts. >>>>>>>>>>>>> >>>>>>>>>>>>> Collector.emit(Tuple input, Values data) >>>>>>>>>>>>> >>>>>>>>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Ravi >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani < >>>>>>>>>>>>> [email protected] wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for your reply. What you are saying is correct, >>>>>>>>>>>>>> However I am able to reproduce it more often and I think it >>>>>>>>>>>>>> happens when >>>>>>>>>>>>>> multiple tuples gets failed in first run but all of those gets >>>>>>>>>>>>>> success on >>>>>>>>>>>>>> retry, something of that sort is happening. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This can be reproduced using following two bolts and >>>>>>>>>>>>>> kafkaSpout easily, by running in cluster more with 3/4 minutes: >>>>>>>>>>>>>> >>>>>>>>>>>>>> *BoltA* >>>>>>>>>>>>>> >>>>>>>>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>>>>>>>> >>>>>>>>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>>>>>>>> >>>>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>>>> val inp = input.getBinaryByField("value").getObj[someObj] >>>>>>>>>>>>>> val randomGenerator = new Random() >>>>>>>>>>>>>> >>>>>>>>>>>>>> var i = 0 >>>>>>>>>>>>>> val rand = randomGenerator.nextBoolean() >>>>>>>>>>>>>> 1 to 100 foreach { >>>>>>>>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>>>>>>>> i += 1 >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> *BoltB* >>>>>>>>>>>>>> >>>>>>>>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>>>>>>>> >>>>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>>>> val abc = input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>>>>>>>> if(abc.index >= 97 && abc.rand){ >>>>>>>>>>>>>> println(s"throwing FailedException for ${abc.index}th >>>>>>>>>>>>>> tuple for") >>>>>>>>>>>>>> throw new FailedException() >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>>>> } >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> *KafkaSpout:* >>>>>>>>>>>>>> >>>>>>>>>>>>>> private def getKafkaSpoutConfig(source: Config) = >>>>>>>>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", >>>>>>>>>>>>>> "queueName") >>>>>>>>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>>>>>>>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>>>>>>>> >>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>>>> >>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>>>> 10, >>>>>>>>>>>>>> >>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000) >>>>>>>>>>>>>> )) >>>>>>>>>>>>>> >>>>>>>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", >>>>>>>>>>>>>> "UNCOMMITTED_EARLIEST"))) >>>>>>>>>>>>>> >>>>>>>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", >>>>>>>>>>>>>> 10000)) >>>>>>>>>>>>>> .build() >>>>>>>>>>>>>> >>>>>>>>>>>>>> Other config: >>>>>>>>>>>>>> >>>>>>>>>>>>>> messageTimeoutInSecons: 300 >>>>>>>>>>>>>> >>>>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 tuples >>>>>>>>>>>>>>> are acked. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The error you're seeing was added as part of >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to >>>>>>>>>>>>>>> avoid having that bug pop up again. Could you try posting your >>>>>>>>>>>>>>> spout >>>>>>>>>>>>>>> configuration? Also if possible, it would be helpful if you >>>>>>>>>>>>>>> could enable >>>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout >>>>>>>>>>>>>>> and maybe also >>>>>>>>>>>>>>> org.apache.storm.kafka.spout.internal.OffsetManager. >>>>>>>>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>>>>>>>> ). It should be possible to track down when/why the consumer >>>>>>>>>>>>>>> position wound >>>>>>>>>>>>>>> up behind the committed offset. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Just so you're aware, the check that crashes the spout has >>>>>>>>>>>>>>> been removed as of >>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. I'd still >>>>>>>>>>>>>>> like to know if there's a bug in the spout causing it to emit >>>>>>>>>>>>>>> tuples that >>>>>>>>>>>>>>> were already committed though. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Version Info: >>>>>>>>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> boltA just does some formatting of requests and emits >>>>>>>>>>>>>>>> another tuple. boltB does some processing and emits around >>>>>>>>>>>>>>>> 100 tuples for each tuple being received. boltC and boltD >>>>>>>>>>>>>>>> processes >>>>>>>>>>>>>>>> these tuples. All the bolts implements BaseBasicBolt. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as >>>>>>>>>>>>>>>> fail and marks that for retry by throwing FailedException, >>>>>>>>>>>>>>>> After a few minutes less than my topology timeout, I get the >>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>> error: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message >>>>>>>>>>>>>>>> that has already been committed. This should never occur when >>>>>>>>>>>>>>>> using the at-least-once processing guarantee. >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message >>>>>>>>>>>>>>>> that has already been committed. This should never occur when >>>>>>>>>>>>>>>> using the at-least-once processing guarantee. >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What seems to be happening is this happens when boltB emits >>>>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails one of the tuples out of >>>>>>>>>>>>>>>> those 100 tuples, I am getting this error. Not able to >>>>>>>>>>>>>>>> understand how to >>>>>>>>>>>>>>>> fix this, ideally it should ack an original tuple when all >>>>>>>>>>>>>>>> 100 tuples are acked, but probably an original tuple is acked >>>>>>>>>>>>>>>> before all >>>>>>>>>>>>>>>> those 100 tuples are acked, which causes this error. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>>>>>>>> >>>>>>>>>> T: +44(0)870 600 2311 >>>>>>>>>> Connect with me: Email <[email protected]> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [image: htk logo] <http://www.htk.co.uk/> >>>>>>>>>> >>>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn >>>>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter >>>>>>>>>> <http://www.twitter.com/htkhorizon> >>>>>>>>>> >>>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, >>>>>>>>>> UK. >>>>>>>>>> Company Registered in England and Wales as 3191677, VAT Number >>>>>>>>>> 675 9467 71 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>>>>>>>> This email is only for the use of the intended recipients and may >>>>>>>>>> contain privileged information. If you’ve received this email in >>>>>>>>>> error, >>>>>>>>>> please let the sender know; then delete the message. The views >>>>>>>>>> expressed in this email represent those of the sender and not >>>>>>>>>> necessarily >>>>>>>>>> of HTK. >>>>>>>>>> >>>>>>>>>
