Sounds good. We're likely going to release 2.0.0 soon, we can probably run a concurrent release for 1.x.
Den lør. 22. dec. 2018 kl. 05.53 skrev saurabh mimani < [email protected]>: > It does not got reproduced yesterday as well, when I was doing my runs. > > I see last release of storm-kafka-client in May-18, It will be great if we > can release a new version of it as I see there are many commits after this. > > Appreciating your efforts to provide a fix for it, Thanks a lot :) > > > ` > > > Best Regards > > Saurabh Kumar Mimani > > > > > On Thu, Dec 20, 2018 at 9:17 AM saurabh mimani <[email protected]> > wrote: > >> Thanks for the info, I have tried few runs with the code you mentioned >> here >> <https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473.>, >> putting that back in 1.x-branch, I did not got this exception so far, I >> will run it few more times to be sure and let you know. >> >> >> >> >> Best Regards >> >> Saurabh Kumar Mimani >> >> >> >> >> On Thu, Dec 13, 2018 at 1:42 PM Stig Rohde Døssing < >> [email protected]> wrote: >> >>> Sounds good. Keep in mind that the branch I linked doesn't perform the >>> check that was throwing the exception at all. If you want to be sure that >>> the bug is gone, you might want to take the branch and copy in the check >>> https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473 >>> . >>> >>> We removed it because it was causing performance issues with newer Kafka >>> versions. If the bug is still there, you won't get the exception anymore, >>> but the spout might emit already processed tuples. Let me know if you'd >>> like to test the spout with the check still in place, and I'll make a >>> branch that still has the check. >>> >>> I don't have a date for 1.2.3, but it could probably be pretty soon. If >>> you'd like to test the spout with the exception check still in place, it >>> would be good to do first, but otherwise we should be able to start the >>> release process soon. >>> >>> Den tor. 13. dec. 2018 kl. 07.51 skrev saurabh mimani < >>> [email protected]>: >>> >>>> Hey Stig, >>>> >>>> Thanks for looking into this and providing a fix. We have build the >>>> storm-kafka-client jar from your branch and it is working fine on that. We >>>> are still testing it and will let you know if there are any issues we fix. >>>> >>>> I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>, >>>> you have put the fix version as 1.2.3, any approximate date when will this >>>> be released? >>>> >>>> Thanks for your help :) >>>> >>>> >>>> >>>> >>>> >>>> Best Regards >>>> >>>> Saurabh Kumar Mimani >>>> >>>> >>>> >>>> >>>> On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing < >>>> [email protected]> wrote: >>>> >>>>> I'm assuming you applied the fix on top of 1.2.1 or something like >>>>> that? The exception can't be thrown from the branch I linked, since it was >>>>> removed in an earlier commit in 1.x-branch. >>>>> >>>>> Your logs show that the committed offset for partition 6 is 1682098 >>>>> (98 for short). 98 was emitted, since it shows up in the emitted list. I'm >>>>> guessing it failed and was replayed. 99 and up are in the acked list, so >>>>> they are ready to commit as soon as 98 finishes processing. >>>>> >>>>> The log shows that 99 is the tuple encountering the exception, so I'm >>>>> guessing what happened is that 98 was acked and the spout decided to >>>>> commit >>>>> 98, 99 etc. For some reason it then still decides to emit 99. The only >>>>> reasons I can think of (barring bugs in Kafka/the Kafka client) for that >>>>> to >>>>> happen would be that 99 is in waitingToEmit and isn't being cleared out >>>>> during the commit (this is the bug I tried to fix), somehow 99 is still >>>>> queued for retry (this should not be possible) or for some reason the >>>>> consumer position ends up below the committed offset. I think the best bet >>>>> for tracking down why it happens would be logging the contents of the >>>>> RetryService, the contents of waitingToEmit and the consumer position both >>>>> after commitOffsetsForAckedTuples, and right before the exception is >>>>> thrown. >>>>> >>>>> Could you try logging those? I can add the log statements on top of >>>>> the bugfix if needed. >>>>> >>>>> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani < >>>>> [email protected]>: >>>>> >>>>>> Hey, I see this is still happening, this time, it seems, as it seemed >>>>>> to me, because same offset from different partition was >>>>>> committed(guessing >>>>>> from logs), but not sure as that should be handled. >>>>>> >>>>>> Please find the logs here >>>>>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>. >>>>>> >>>>>> >>>>>> >>>>>> Best Regards >>>>>> >>>>>> Saurabh Kumar Mimani >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I believe I have a fix, your logs were helpful. Please try out the >>>>>>> changes in https://github.com/apache/storm/pull/2923/files. >>>>>>> >>>>>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani < >>>>>>> [email protected]>: >>>>>>> >>>>>>>> Hey, >>>>>>>> >>>>>>>> Thanks for looking into this. I was not able to produce this >>>>>>>> earlier on my local, however I will again try once. I was consistently >>>>>>>> able >>>>>>>> to reproduce it with parallelism of 5 for boltA and parallelism of 200 >>>>>>>> with >>>>>>>> boltB on 2 machines in cluster mode. >>>>>>>> >>>>>>>> I will try again with your code once. >>>>>>>> >>>>>>>> These >>>>>>>> <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are >>>>>>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode >>>>>>>> with >>>>>>>> my topology, in case these helps. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Saurabh Kumar Mimani >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> I can't reproduce this. >>>>>>>>> >>>>>>>>> I created a test topology similar to the code you posted, based on >>>>>>>>> the 1.2.1 release tag >>>>>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564 >>>>>>>>> . >>>>>>>>> >>>>>>>>> I set up a local Kafka instance and put enough messages to run the >>>>>>>>> topology for 15 minutes or so in the test topic. After populating the >>>>>>>>> topic, I started the topology and let it run until it reached the end >>>>>>>>> of >>>>>>>>> the topic. As expected a lot of messages failed, but after a while it >>>>>>>>> managed to successfully process all messages. I didn't see any worker >>>>>>>>> crashes, and the logs only show some errors related to moving files >>>>>>>>> (expected on Windows). >>>>>>>>> >>>>>>>>> The topology seems to work fine against both Kafka 0.10.2.2 and >>>>>>>>> 1.1.1, though 1.1.1 is slower due to >>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. >>>>>>>>> >>>>>>>>> The Kafka setup was with the default configuration for both Kafka >>>>>>>>> and Zookeeper, and Storm was set up with a local Nimbus, single local >>>>>>>>> Supervisor and 4 worker slots. >>>>>>>>> >>>>>>>>> Saurabh please try to reproduce the issue using the topology I >>>>>>>>> linked. If you need to make adjustments in order to provoke the issue, >>>>>>>>> please update the code and link it so I can check it out and try it. >>>>>>>>> >>>>>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < >>>>>>>>> [email protected]>: >>>>>>>>> >>>>>>>>>> No, I have checked that, there is no other consumer group >>>>>>>>>> consuming from the same. >>>>>>>>>> >>>>>>>>>> Thanks for looking into it, let me know if you need any >>>>>>>>>> other information. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples >>>>>>>>>>> to the input tuple. It's intended for bolts that just need to >>>>>>>>>>> receive a >>>>>>>>>>> tuple, synchronously process it and emit some new tuples anchored >>>>>>>>>>> to the >>>>>>>>>>> input tuple. It's there because doing manual acking is tedious and >>>>>>>>>>> error-prone in cases where you don't need to be able to e.g. emit >>>>>>>>>>> new >>>>>>>>>>> unachored tuples or ack the input tuple asynchronously. As Peter >>>>>>>>>>> mentioned, >>>>>>>>>>> the BasicBoltExecutor ( >>>>>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42) >>>>>>>>>>> handles acking for you. >>>>>>>>>>> >>>>>>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also >>>>>>>>>>> check that you don't have any other consumers using the same >>>>>>>>>>> consumer group >>>>>>>>>>> as the spout. >>>>>>>>>>> >>>>>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>>>>>>>>>> [email protected]>: >>>>>>>>>>> >>>>>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor >>>>>>>>>>>> (an implmentation of IRichBolt), which calls >>>>>>>>>>>> collector.setContext(input), >>>>>>>>>>>> and also does the acking inside it's execute function, and in >>>>>>>>>>>> between calls >>>>>>>>>>>> the BaseBasicBolt.execute version (which takes the collector as >>>>>>>>>>>> well as the >>>>>>>>>>>> tuple as parameters). >>>>>>>>>>>> So the intention is clearly that it is automatically anchored >>>>>>>>>>>> and acknowledged. >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't >>>>>>>>>>>>> do much. >>>>>>>>>>>>> Also checked BasicOutputCollector and don't see how it will >>>>>>>>>>>>> anchor automatically unless you call >>>>>>>>>>>>> BasicOutputCollector.setContext(Tuple tuple), don't see all of >>>>>>>>>>>>> your code, >>>>>>>>>>>>> but don't see this call in your boltA code. >>>>>>>>>>>>> Also it looks like even when you make this setContext call, >>>>>>>>>>>>> after that you will have to emit using following emit function >>>>>>>>>>>>> >>>>>>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples) >>>>>>>>>>>>> >>>>>>>>>>>>> Basically just check that whatever emit function is called, it >>>>>>>>>>>>> does pass the input tuple. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Once I had exactly same issue for few days, but mine was >>>>>>>>>>>>> related to config. I wanted to read from two Kafka topics in one >>>>>>>>>>>>> topology >>>>>>>>>>>>> and had two different kafkaspout created, mistakenly I copy >>>>>>>>>>>>> pasted same >>>>>>>>>>>>> config and that caused this issue. Not sure if that applies to >>>>>>>>>>>>> your >>>>>>>>>>>>> scenario. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> *NOTE*: I checked the latest storm master branch for code. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani < >>>>>>>>>>>>> [email protected] wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey Ravi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am using *BaseBasicBolt*, which as described here >>>>>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html> >>>>>>>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically >>>>>>>>>>>>>> anchored to the input tuple, and the input tuple is acked for you >>>>>>>>>>>>>> automatically when the execute method completes. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What you are saying is applicable for *BaseRichBolt. *The >>>>>>>>>>>>>> Kafka spout I am using is from storm-kafka-client >>>>>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1> >>>>>>>>>>>>>> library, so unique ID, etc should be already taken care of. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>> >>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>>>> I think there is issue with part of code which is emitting >>>>>>>>>>>>>>> the tuples. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If you want to use ack mechanism, you need to use anchor >>>>>>>>>>>>>>> tuple when you emit from bolts. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Collector.emit(Tuple input, Values data) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>> Ravi >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani < >>>>>>>>>>>>>>> [email protected] wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hey, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for your reply. What you are saying is correct, >>>>>>>>>>>>>>>> However I am able to reproduce it more often and I think it >>>>>>>>>>>>>>>> happens when >>>>>>>>>>>>>>>> multiple tuples gets failed in first run but all of those gets >>>>>>>>>>>>>>>> success on >>>>>>>>>>>>>>>> retry, something of that sort is happening. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This can be reproduced using following two bolts and >>>>>>>>>>>>>>>> kafkaSpout easily, by running in cluster more with 3/4 minutes: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *BoltA* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>>>>>> val inp = input.getBinaryByField("value").getObj[someObj] >>>>>>>>>>>>>>>> val randomGenerator = new Random() >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> var i = 0 >>>>>>>>>>>>>>>> val rand = randomGenerator.nextBoolean() >>>>>>>>>>>>>>>> 1 to 100 foreach { >>>>>>>>>>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>>>>>>>>>> i += 1 >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *BoltB* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>>>>>>>>> val abc = input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>>>>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>>>>>>>>>> if(abc.index >= 97 && abc.rand){ >>>>>>>>>>>>>>>> println(s"throwing FailedException for ${abc.index}th >>>>>>>>>>>>>>>> tuple for") >>>>>>>>>>>>>>>> throw new FailedException() >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> override def declareOutputFields(declarer: >>>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = { >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> *KafkaSpout:* >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> private def getKafkaSpoutConfig(source: Config) = >>>>>>>>>>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", >>>>>>>>>>>>>>>> "queueName") >>>>>>>>>>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>>>>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>>>>>>>>>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>>>>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>>>>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>>>>>>>>> 10, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000) >>>>>>>>>>>>>>>> )) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", >>>>>>>>>>>>>>>> "UNCOMMITTED_EARLIEST"))) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", >>>>>>>>>>>>>>>> 10000)) >>>>>>>>>>>>>>>> .build() >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Other config: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> messageTimeoutInSecons: 300 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Saurabh, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 >>>>>>>>>>>>>>>>> tuples are acked. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The error you're seeing was added as part of >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try >>>>>>>>>>>>>>>>> to avoid having that bug pop up again. Could you try posting >>>>>>>>>>>>>>>>> your spout >>>>>>>>>>>>>>>>> configuration? Also if possible, it would be helpful if you >>>>>>>>>>>>>>>>> could enable >>>>>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout >>>>>>>>>>>>>>>>> and maybe also >>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.internal.OffsetManager. >>>>>>>>>>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>>>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>>>>>>>>>> ). It should be possible to track down when/why the consumer >>>>>>>>>>>>>>>>> position wound >>>>>>>>>>>>>>>>> up behind the committed offset. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Just so you're aware, the check that crashes the spout has >>>>>>>>>>>>>>>>> been removed as of >>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. I'd >>>>>>>>>>>>>>>>> still like to know if there's a bug in the spout causing it >>>>>>>>>>>>>>>>> to emit tuples >>>>>>>>>>>>>>>>> that were already committed though. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Version Info: >>>>>>>>>>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> boltA just does some formatting of requests and emits >>>>>>>>>>>>>>>>>> another tuple. boltB does some processing and emits >>>>>>>>>>>>>>>>>> around 100 tuples for each tuple being received. boltC >>>>>>>>>>>>>>>>>> and boltD processes these tuples. All the bolts >>>>>>>>>>>>>>>>>> implements BaseBasicBolt. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as >>>>>>>>>>>>>>>>>> fail and marks that for retry by throwing FailedException, >>>>>>>>>>>>>>>>>> After a few minutes less than my topology timeout, I get the >>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>> error: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a >>>>>>>>>>>>>>>>>> message that has already been committed. This should never >>>>>>>>>>>>>>>>>> occur when using the at-least-once processing guarantee. >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a >>>>>>>>>>>>>>>>>> message that has already been committed. This should never >>>>>>>>>>>>>>>>>> occur when using the at-least-once processing guarantee. >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) >>>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?] >>>>>>>>>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What seems to be happening is this happens when boltB emits >>>>>>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails one of the tuples out >>>>>>>>>>>>>>>>>> of those 100 tuples, I am getting this error. Not able to >>>>>>>>>>>>>>>>>> understand how to >>>>>>>>>>>>>>>>>> fix this, ideally it should ack an original tuple when >>>>>>>>>>>>>>>>>> all 100 tuples are acked, but probably an original tuple is >>>>>>>>>>>>>>>>>> acked before >>>>>>>>>>>>>>>>>> all those 100 tuples are acked, which causes this error. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>>>>>>>>>> >>>>>>>>>>>> T: +44(0)870 600 2311 >>>>>>>>>>>> Connect with me: Email <[email protected]> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> [image: htk logo] <http://www.htk.co.uk/> >>>>>>>>>>>> >>>>>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn >>>>>>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter >>>>>>>>>>>> <http://www.twitter.com/htkhorizon> >>>>>>>>>>>> >>>>>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, >>>>>>>>>>>> UK. >>>>>>>>>>>> Company Registered in England and Wales as 3191677, VAT Number >>>>>>>>>>>> 675 9467 71 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>>>>>>>>>> This email is only for the use of the intended recipients and >>>>>>>>>>>> may contain privileged information. If you’ve received this email >>>>>>>>>>>> in error, >>>>>>>>>>>> please let the sender know; then delete the message. The views >>>>>>>>>>>> expressed in this email represent those of the sender and not >>>>>>>>>>>> necessarily >>>>>>>>>>>> of HTK. >>>>>>>>>>>> >>>>>>>>>>>
