Thanks for the info, I have tried few runs with the code you mentioned here
<https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473.>,
putting that back in 1.x-branch, I did not got this exception so far, I
will run it few more times to be sure and let you know.




Best Regards

Saurabh Kumar Mimani




On Thu, Dec 13, 2018 at 1:42 PM Stig Rohde Døssing <[email protected]>
wrote:

> Sounds good. Keep in mind that the branch I linked doesn't perform the
> check that was throwing the exception at all. If you want to be sure that
> the bug is gone, you might want to take the branch and copy in the check
> https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473
> .
>
> We removed it because it was causing performance issues with newer Kafka
> versions. If the bug is still there, you won't get the exception anymore,
> but the spout might emit already processed tuples. Let me know if you'd
> like to test the spout with the check still in place, and I'll make a
> branch that still has the check.
>
> I don't have a date for 1.2.3, but it could probably be pretty soon. If
> you'd like to test the spout with the exception check still in place, it
> would be good to do first, but otherwise we should be able to start the
> release process soon.
>
> Den tor. 13. dec. 2018 kl. 07.51 skrev saurabh mimani <
> [email protected]>:
>
>> Hey Stig,
>>
>> Thanks for looking into this and providing a fix. We have build the
>> storm-kafka-client jar from your branch and it is working fine on that. We
>> are still testing it and will let you know if there are any issues we fix.
>>
>> I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>,
>> you have put the fix version as 1.2.3, any approximate date when will this
>> be released?
>>
>> Thanks for your help :)
>>
>>
>>
>>
>>
>> Best Regards
>>
>> Saurabh Kumar Mimani
>>
>>
>>
>>
>> On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing <
>> [email protected]> wrote:
>>
>>> I'm assuming you applied the fix on top of 1.2.1 or something like that?
>>> The exception can't be thrown from the branch I linked, since it was
>>> removed in an earlier commit in 1.x-branch.
>>>
>>> Your logs show that the committed offset for partition 6 is 1682098 (98
>>> for short). 98 was emitted, since it shows up in the emitted list. I'm
>>> guessing it failed and was replayed. 99 and up are in the acked list, so
>>> they are ready to commit as soon as 98 finishes processing.
>>>
>>> The log shows that 99 is the tuple encountering the exception, so I'm
>>> guessing what happened is that 98 was acked and the spout decided to commit
>>> 98, 99 etc. For some reason it then still decides to emit 99. The only
>>> reasons I can think of (barring bugs in Kafka/the Kafka client) for that to
>>> happen would be that 99 is in waitingToEmit and isn't being cleared out
>>> during the commit (this is the bug I tried to fix), somehow 99 is still
>>> queued for retry (this should not be possible) or for some reason the
>>> consumer position ends up below the committed offset. I think the best bet
>>> for tracking down why it happens would be logging the contents of the
>>> RetryService, the contents of waitingToEmit and the consumer position both
>>> after commitOffsetsForAckedTuples, and right before the exception is
>>> thrown.
>>>
>>> Could you try logging those? I can add the log statements on top of the
>>> bugfix if needed.
>>>
>>> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani <
>>> [email protected]>:
>>>
>>>> Hey, I see this is still happening, this time, it seems, as it seemed
>>>> to me, because same offset from different partition was committed(guessing
>>>> from logs), but not sure as that should be handled.
>>>>
>>>> Please find the logs here
>>>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>.
>>>>
>>>>
>>>>
>>>> Best Regards
>>>>
>>>> Saurabh Kumar Mimani
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing <
>>>> [email protected]> wrote:
>>>>
>>>>> I believe I have a fix, your logs were helpful. Please try out the
>>>>> changes in https://github.com/apache/storm/pull/2923/files.
>>>>>
>>>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani <
>>>>> [email protected]>:
>>>>>
>>>>>> Hey,
>>>>>>
>>>>>> Thanks for looking into this. I was not able to produce this earlier
>>>>>> on my local, however I will again try once. I was consistently able to
>>>>>> reproduce it with parallelism of 5 for boltA and parallelism of 200 with
>>>>>> boltB on 2 machines in cluster mode.
>>>>>>
>>>>>> I will try again with your code once.
>>>>>>
>>>>>> These
>>>>>> <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are
>>>>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode with
>>>>>> my topology, in case these helps.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best Regards
>>>>>>
>>>>>> Saurabh Kumar Mimani
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I can't reproduce this.
>>>>>>>
>>>>>>> I created a test topology similar to the code you posted, based on
>>>>>>> the 1.2.1 release tag
>>>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564
>>>>>>> .
>>>>>>>
>>>>>>> I set up a local Kafka instance and put enough messages to run the
>>>>>>> topology for 15 minutes or so in the test topic. After populating the
>>>>>>> topic, I started the topology and let it run until it reached the end of
>>>>>>> the topic. As expected a lot of messages failed, but after a while it
>>>>>>> managed to successfully process all messages. I didn't see any worker
>>>>>>> crashes, and the logs only show some errors related to moving files
>>>>>>> (expected on Windows).
>>>>>>>
>>>>>>> The topology seems to work fine against both Kafka 0.10.2.2 and
>>>>>>> 1.1.1, though 1.1.1 is slower due to
>>>>>>> https://issues.apache.org/jira/browse/STORM-3102.
>>>>>>>
>>>>>>> The Kafka setup was with the default configuration for both Kafka
>>>>>>> and Zookeeper, and Storm was set up with a local Nimbus, single local
>>>>>>> Supervisor and 4 worker slots.
>>>>>>>
>>>>>>> Saurabh please try to reproduce the issue using the topology I
>>>>>>> linked. If you need to make adjustments in order to provoke the issue,
>>>>>>> please update the code and link it so I can check it out and try it.
>>>>>>>
>>>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani <
>>>>>>> [email protected]>:
>>>>>>>
>>>>>>>> No, I have checked that, there is no other consumer group consuming
>>>>>>>> from the same.
>>>>>>>>
>>>>>>>> Thanks for looking into it, let me know if you need any
>>>>>>>> other information.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples
>>>>>>>>> to the input tuple. It's intended for bolts that just need to receive 
>>>>>>>>> a
>>>>>>>>> tuple, synchronously process it and emit some new tuples anchored to 
>>>>>>>>> the
>>>>>>>>> input tuple. It's there because doing manual acking is tedious and
>>>>>>>>> error-prone in cases where you don't need to be able to e.g. emit new
>>>>>>>>> unachored tuples or ack the input tuple asynchronously. As Peter 
>>>>>>>>> mentioned,
>>>>>>>>> the BasicBoltExecutor (
>>>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
>>>>>>>>> handles acking for you.
>>>>>>>>>
>>>>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also check
>>>>>>>>> that you don't have any other consumers using the same consumer group 
>>>>>>>>> as
>>>>>>>>> the spout.
>>>>>>>>>
>>>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain <
>>>>>>>>> [email protected]>:
>>>>>>>>>
>>>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an
>>>>>>>>>> implmentation of IRichBolt), which calls 
>>>>>>>>>> collector.setContext(input), and
>>>>>>>>>> also does the acking inside it's execute function, and in between 
>>>>>>>>>> calls the
>>>>>>>>>> BaseBasicBolt.execute version (which takes the collector as well as 
>>>>>>>>>> the
>>>>>>>>>> tuple as parameters).
>>>>>>>>>> So the intention is clearly that it is automatically anchored and
>>>>>>>>>> acknowledged.
>>>>>>>>>>
>>>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't
>>>>>>>>>>> do much.
>>>>>>>>>>> Also checked BasicOutputCollector and don't see how it will
>>>>>>>>>>> anchor automatically unless you call
>>>>>>>>>>>  BasicOutputCollector.setContext(Tuple tuple), don't see all of 
>>>>>>>>>>> your code,
>>>>>>>>>>> but don't see this call in your boltA code.
>>>>>>>>>>> Also it looks like even when you make this setContext call,
>>>>>>>>>>> after that you will have to emit using following emit function
>>>>>>>>>>>
>>>>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples)
>>>>>>>>>>>
>>>>>>>>>>> Basically just check that whatever emit function is called, it
>>>>>>>>>>> does pass the input tuple.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Once I had exactly same issue for few days, but mine was related
>>>>>>>>>>> to config. I wanted to read from two Kafka topics in one topology 
>>>>>>>>>>> and had
>>>>>>>>>>> two different kafkaspout created, mistakenly I copy pasted same 
>>>>>>>>>>> config and
>>>>>>>>>>> that caused this issue. Not sure if that applies to your scenario.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *NOTE*: I checked the latest storm master branch for code.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <
>>>>>>>>>>> [email protected] wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey Ravi,
>>>>>>>>>>>>
>>>>>>>>>>>> I am using *BaseBasicBolt*, which as described here
>>>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>>>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically
>>>>>>>>>>>> anchored to the input tuple, and the input tuple is acked for you
>>>>>>>>>>>> automatically when the execute method completes.
>>>>>>>>>>>>
>>>>>>>>>>>> What you are saying is applicable for *BaseRichBolt. *The
>>>>>>>>>>>> Kafka spout I am using is from storm-kafka-client
>>>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>>>>>>>>>> library, so unique ID, etc should be already taken care of.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>> I think there is issue with part of code which is emitting the
>>>>>>>>>>>>> tuples.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If you want to use ack mechanism, you need to use anchor tuple
>>>>>>>>>>>>> when you emit from bolts.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Collector.emit(Tuple input, Values data)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also make sure Kafka spout emits tuple with a unique id.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Ravi
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <
>>>>>>>>>>>>> [email protected] wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for your reply. What you are saying is correct,
>>>>>>>>>>>>>> However I am able to reproduce it more often and I think it 
>>>>>>>>>>>>>> happens when
>>>>>>>>>>>>>> multiple tuples gets failed in first run but all of those gets 
>>>>>>>>>>>>>> success on
>>>>>>>>>>>>>> retry, something of that sort is happening.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This can be reproduced using following two bolts and
>>>>>>>>>>>>>> kafkaSpout easily, by running in cluster more with 3/4 minutes:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *BoltA*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> case class Abc(index: Int, rand: Boolean)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> class BoltA  extends BaseBasicBolt {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   override def execute(input: Tuple, collector: 
>>>>>>>>>>>>>> BasicOutputCollector): Unit = {
>>>>>>>>>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>>>>>>>>>     val randomGenerator = new Random()
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     var i = 0
>>>>>>>>>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>>>>>>>>>     1 to 100 foreach {
>>>>>>>>>>>>>>       collector.emit(new Values(Abc(i, rand).getJsonBytes))
>>>>>>>>>>>>>>       i += 1
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   override def declareOutputFields(declarer: 
>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *BoltB*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> class BoltB  extends BaseBasicBolt {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   override def execute(input: Tuple, collector: 
>>>>>>>>>>>>>> BasicOutputCollector): Unit = {
>>>>>>>>>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>>>>>>>>>     println(s"Received ${abc.index}th tuple in BoltB")
>>>>>>>>>>>>>>     if(abc.index >= 97 && abc.rand){
>>>>>>>>>>>>>>       println(s"throwing FailedException for ${abc.index}th 
>>>>>>>>>>>>>> tuple for")
>>>>>>>>>>>>>>       throw new FailedException()
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   override def declareOutputFields(declarer: 
>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *KafkaSpout:*
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> private def getKafkaSpoutConfig(source: Config) = 
>>>>>>>>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
>>>>>>>>>>>>>>  "queueName")
>>>>>>>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
>>>>>>>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
>>>>>>>>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>>>>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>>>>>>>>>       
>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>>       
>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>>       10,
>>>>>>>>>>>>>>       
>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>>>>>>>>>     ))
>>>>>>>>>>>>>>     
>>>>>>>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
>>>>>>>>>>>>>>  "UNCOMMITTED_EARLIEST")))
>>>>>>>>>>>>>>     
>>>>>>>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
>>>>>>>>>>>>>>  10000))
>>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Other config:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> messageTimeoutInSecons: 300
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The tuple emitted by the spout will only be acked once all
>>>>>>>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 tuples 
>>>>>>>>>>>>>>> are acked.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The error you're seeing was added as part of
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to
>>>>>>>>>>>>>>> avoid having that bug pop up again. Could you try posting your 
>>>>>>>>>>>>>>> spout
>>>>>>>>>>>>>>> configuration? Also if possible, it would be helpful if you 
>>>>>>>>>>>>>>> could enable
>>>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout
>>>>>>>>>>>>>>> and maybe also 
>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.internal.OffsetManager.
>>>>>>>>>>>>>>> They log when offsets are committed (e.g.
>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>>>>>>>>>> and also when the consumer position is changed (e.g.
>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>>>>>>>>>> ). It should be possible to track down when/why the consumer 
>>>>>>>>>>>>>>> position wound
>>>>>>>>>>>>>>> up behind the committed offset.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Just so you're aware, the check that crashes the spout has
>>>>>>>>>>>>>>> been removed as of
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. I'd still
>>>>>>>>>>>>>>> like to know if there's a bug in the spout causing it to emit 
>>>>>>>>>>>>>>> tuples that
>>>>>>>>>>>>>>> were already committed though.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <
>>>>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Version Info:
>>>>>>>>>>>>>>>>    "org.apache.storm" % "storm-core" % "1.2.1"
>>>>>>>>>>>>>>>>    "org.apache.storm" % "storm-kafka-client" % "1.2.1"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have a storm topology which looks like following:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> boltA -> boltB -> boltC -> boltD
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> boltA just does some formatting of requests and emits
>>>>>>>>>>>>>>>> another tuple. boltB does some processing and emits around
>>>>>>>>>>>>>>>> 100 tuples for each tuple being received. boltC and boltD 
>>>>>>>>>>>>>>>> processes
>>>>>>>>>>>>>>>> these tuples. All the bolts implements BaseBasicBolt.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as
>>>>>>>>>>>>>>>> fail and marks that for retry by throwing FailedException,
>>>>>>>>>>>>>>>> After a few minutes less than my topology timeout, I get the 
>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>> error:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message 
>>>>>>>>>>>>>>>> that has already been committed. This should never occur when 
>>>>>>>>>>>>>>>> using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>>>>>>>>>>>>>>>>  ~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
>>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) 
>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR]
>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message 
>>>>>>>>>>>>>>>> that has already been committed. This should never occur when 
>>>>>>>>>>>>>>>> using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>>>>>>>>>>>>>>>>  ~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) 
>>>>>>>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) 
>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> What seems to be happening is this happens when boltB emits
>>>>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails one of the tuples out of
>>>>>>>>>>>>>>>> those 100 tuples, I am getting this error. Not able to 
>>>>>>>>>>>>>>>> understand how to
>>>>>>>>>>>>>>>> fix this, ideally it should ack an original tuple when all
>>>>>>>>>>>>>>>> 100 tuples are acked, but probably an original tuple is acked 
>>>>>>>>>>>>>>>> before all
>>>>>>>>>>>>>>>> those 100 tuples are acked, which causes this error.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK
>>>>>>>>>>
>>>>>>>>>> T: +44(0)870 600 2311
>>>>>>>>>> Connect with me: Email <[email protected]>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> [image: htk logo] <http://www.htk.co.uk/>
>>>>>>>>>>
>>>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn
>>>>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter
>>>>>>>>>> <http://www.twitter.com/htkhorizon>
>>>>>>>>>>
>>>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS,
>>>>>>>>>> UK.
>>>>>>>>>> Company Registered in England and Wales as 3191677, VAT Number
>>>>>>>>>> 675 9467 71
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
>>>>>>>>>> This email is only for the use of the intended recipients and may
>>>>>>>>>> contain privileged information. If you’ve received this email in 
>>>>>>>>>> error,
>>>>>>>>>> please let the sender know; then delete the message. The views
>>>>>>>>>> expressed in this email represent those of the sender and not 
>>>>>>>>>> necessarily
>>>>>>>>>> of HTK.
>>>>>>>>>>
>>>>>>>>>

Reply via email to