Sounds good. We're likely going to release 2.0.0 soon, we can probably run
a concurrent release for 1.x.

Den lør. 22. dec. 2018 kl. 05.53 skrev saurabh mimani <
[email protected]>:

> It does not got reproduced yesterday as well, when I was doing my runs.
>
> I see last release of storm-kafka-client in May-18, It will be great if we
> can release a new version of it as I see there are many commits after this.
>
> Appreciating your efforts to provide a fix for it, Thanks a lot :)
>
>
> `
>
>
> Best Regards
>
> Saurabh Kumar Mimani
>
>
>
>
> On Thu, Dec 20, 2018 at 9:17 AM saurabh mimani <[email protected]>
> wrote:
>
>> Thanks for the info, I have tried few runs with the code you mentioned
>> here
>> <https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473.>,
>> putting that back in 1.x-branch, I did not got this exception so far, I
>> will run it few more times to be sure and let you know.
>>
>>
>>
>>
>> Best Regards
>>
>> Saurabh Kumar Mimani
>>
>>
>>
>>
>> On Thu, Dec 13, 2018 at 1:42 PM Stig Rohde Døssing <
>> [email protected]> wrote:
>>
>>> Sounds good. Keep in mind that the branch I linked doesn't perform the
>>> check that was throwing the exception at all. If you want to be sure that
>>> the bug is gone, you might want to take the branch and copy in the check
>>> https://github.com/apache/storm/blob/2dc3d53a11aa3fea621666690d1e44fa8b621466/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L465-L473
>>> .
>>>
>>> We removed it because it was causing performance issues with newer Kafka
>>> versions. If the bug is still there, you won't get the exception anymore,
>>> but the spout might emit already processed tuples. Let me know if you'd
>>> like to test the spout with the check still in place, and I'll make a
>>> branch that still has the check.
>>>
>>> I don't have a date for 1.2.3, but it could probably be pretty soon. If
>>> you'd like to test the spout with the exception check still in place, it
>>> would be good to do first, but otherwise we should be able to start the
>>> release process soon.
>>>
>>> Den tor. 13. dec. 2018 kl. 07.51 skrev saurabh mimani <
>>> [email protected]>:
>>>
>>>> Hey Stig,
>>>>
>>>> Thanks for looking into this and providing a fix. We have build the
>>>> storm-kafka-client jar from your branch and it is working fine on that. We
>>>> are still testing it and will let you know if there are any issues we fix.
>>>>
>>>> I see for STORM-3301 <https://issues.apache.org/jira/browse/STORM-3301>,
>>>> you have put the fix version as 1.2.3, any approximate date when will this
>>>> be released?
>>>>
>>>> Thanks for your help :)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Best Regards
>>>>
>>>> Saurabh Kumar Mimani
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Dec 10, 2018 at 1:12 AM Stig Rohde Døssing <
>>>> [email protected]> wrote:
>>>>
>>>>> I'm assuming you applied the fix on top of 1.2.1 or something like
>>>>> that? The exception can't be thrown from the branch I linked, since it was
>>>>> removed in an earlier commit in 1.x-branch.
>>>>>
>>>>> Your logs show that the committed offset for partition 6 is 1682098
>>>>> (98 for short). 98 was emitted, since it shows up in the emitted list. I'm
>>>>> guessing it failed and was replayed. 99 and up are in the acked list, so
>>>>> they are ready to commit as soon as 98 finishes processing.
>>>>>
>>>>> The log shows that 99 is the tuple encountering the exception, so I'm
>>>>> guessing what happened is that 98 was acked and the spout decided to 
>>>>> commit
>>>>> 98, 99 etc. For some reason it then still decides to emit 99. The only
>>>>> reasons I can think of (barring bugs in Kafka/the Kafka client) for that 
>>>>> to
>>>>> happen would be that 99 is in waitingToEmit and isn't being cleared out
>>>>> during the commit (this is the bug I tried to fix), somehow 99 is still
>>>>> queued for retry (this should not be possible) or for some reason the
>>>>> consumer position ends up below the committed offset. I think the best bet
>>>>> for tracking down why it happens would be logging the contents of the
>>>>> RetryService, the contents of waitingToEmit and the consumer position both
>>>>> after commitOffsetsForAckedTuples, and right before the exception is
>>>>> thrown.
>>>>>
>>>>> Could you try logging those? I can add the log statements on top of
>>>>> the bugfix if needed.
>>>>>
>>>>> Den søn. 9. dec. 2018 kl. 18.42 skrev saurabh mimani <
>>>>> [email protected]>:
>>>>>
>>>>>> Hey, I see this is still happening, this time, it seems, as it seemed
>>>>>> to me, because same offset from different partition was 
>>>>>> committed(guessing
>>>>>> from logs), but not sure as that should be handled.
>>>>>>
>>>>>> Please find the logs here
>>>>>> <https://gist.github.com/mimani/ff27b7272482efc91c4d145d59ab59be>.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Best Regards
>>>>>>
>>>>>> Saurabh Kumar Mimani
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Dec 9, 2018 at 3:19 AM Stig Rohde Døssing <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I believe I have a fix, your logs were helpful. Please try out the
>>>>>>> changes in https://github.com/apache/storm/pull/2923/files.
>>>>>>>
>>>>>>> Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani <
>>>>>>> [email protected]>:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>>
>>>>>>>> Thanks for looking into this. I was not able to produce this
>>>>>>>> earlier on my local, however I will again try once. I was consistently 
>>>>>>>> able
>>>>>>>> to reproduce it with parallelism of 5 for boltA and parallelism of 200 
>>>>>>>> with
>>>>>>>> boltB on 2 machines in cluster mode.
>>>>>>>>
>>>>>>>> I will try again with your code once.
>>>>>>>>
>>>>>>>> These
>>>>>>>> <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are
>>>>>>>> logs of Kafka Spout, when I was able to reproduce it in cluster mode 
>>>>>>>> with
>>>>>>>> my topology, in case these helps.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> I can't reproduce this.
>>>>>>>>>
>>>>>>>>> I created a test topology similar to the code you posted, based on
>>>>>>>>> the 1.2.1 release tag
>>>>>>>>> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>> I set up a local Kafka instance and put enough messages to run the
>>>>>>>>> topology for 15 minutes or so in the test topic. After populating the
>>>>>>>>> topic, I started the topology and let it run until it reached the end 
>>>>>>>>> of
>>>>>>>>> the topic. As expected a lot of messages failed, but after a while it
>>>>>>>>> managed to successfully process all messages. I didn't see any worker
>>>>>>>>> crashes, and the logs only show some errors related to moving files
>>>>>>>>> (expected on Windows).
>>>>>>>>>
>>>>>>>>> The topology seems to work fine against both Kafka 0.10.2.2 and
>>>>>>>>> 1.1.1, though 1.1.1 is slower due to
>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102.
>>>>>>>>>
>>>>>>>>> The Kafka setup was with the default configuration for both Kafka
>>>>>>>>> and Zookeeper, and Storm was set up with a local Nimbus, single local
>>>>>>>>> Supervisor and 4 worker slots.
>>>>>>>>>
>>>>>>>>> Saurabh please try to reproduce the issue using the topology I
>>>>>>>>> linked. If you need to make adjustments in order to provoke the issue,
>>>>>>>>> please update the code and link it so I can check it out and try it.
>>>>>>>>>
>>>>>>>>> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani <
>>>>>>>>> [email protected]>:
>>>>>>>>>
>>>>>>>>>> No, I have checked that, there is no other consumer group
>>>>>>>>>> consuming from the same.
>>>>>>>>>>
>>>>>>>>>> Thanks for looking into it, let me know if you need any
>>>>>>>>>> other information.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples
>>>>>>>>>>> to the input tuple. It's intended for bolts that just need to 
>>>>>>>>>>> receive a
>>>>>>>>>>> tuple, synchronously process it and emit some new tuples anchored 
>>>>>>>>>>> to the
>>>>>>>>>>> input tuple. It's there because doing manual acking is tedious and
>>>>>>>>>>> error-prone in cases where you don't need to be able to e.g. emit 
>>>>>>>>>>> new
>>>>>>>>>>> unachored tuples or ack the input tuple asynchronously. As Peter 
>>>>>>>>>>> mentioned,
>>>>>>>>>>> the BasicBoltExecutor (
>>>>>>>>>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42)
>>>>>>>>>>> handles acking for you.
>>>>>>>>>>>
>>>>>>>>>>> Saurabh, I'll see if I can reproduce your issue. Please also
>>>>>>>>>>> check that you don't have any other consumers using the same 
>>>>>>>>>>> consumer group
>>>>>>>>>>> as the spout.
>>>>>>>>>>>
>>>>>>>>>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain <
>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>
>>>>>>>>>>>> Pretty sure that the ack path is handled by BasicBoltExecutor
>>>>>>>>>>>> (an implmentation of IRichBolt), which calls 
>>>>>>>>>>>> collector.setContext(input),
>>>>>>>>>>>> and also does the acking inside it's execute function, and in 
>>>>>>>>>>>> between calls
>>>>>>>>>>>> the BaseBasicBolt.execute version (which takes the collector as 
>>>>>>>>>>>> well as the
>>>>>>>>>>>> tuple as parameters).
>>>>>>>>>>>> So the intention is clearly that it is automatically anchored
>>>>>>>>>>>> and acknowledged.
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't
>>>>>>>>>>>>> do much.
>>>>>>>>>>>>> Also checked BasicOutputCollector and don't see how it will
>>>>>>>>>>>>> anchor automatically unless you call
>>>>>>>>>>>>>  BasicOutputCollector.setContext(Tuple tuple), don't see all of 
>>>>>>>>>>>>> your code,
>>>>>>>>>>>>> but don't see this call in your boltA code.
>>>>>>>>>>>>> Also it looks like even when you make this setContext call,
>>>>>>>>>>>>> after that you will have to emit using following emit function
>>>>>>>>>>>>>
>>>>>>>>>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Basically just check that whatever emit function is called, it
>>>>>>>>>>>>> does pass the input tuple.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Once I had exactly same issue for few days, but mine was
>>>>>>>>>>>>> related to config. I wanted to read from two Kafka topics in one 
>>>>>>>>>>>>> topology
>>>>>>>>>>>>> and had two different kafkaspout created, mistakenly I copy 
>>>>>>>>>>>>> pasted same
>>>>>>>>>>>>> config and that caused this issue. Not sure if that applies to 
>>>>>>>>>>>>> your
>>>>>>>>>>>>> scenario.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *NOTE*: I checked the latest storm master branch for code.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <
>>>>>>>>>>>>> [email protected] wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey Ravi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am using *BaseBasicBolt*, which as described here
>>>>>>>>>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html>
>>>>>>>>>>>>>> : Tuples emitted to BasicOutputCollector are automatically
>>>>>>>>>>>>>> anchored to the input tuple, and the input tuple is acked for you
>>>>>>>>>>>>>> automatically when the execute method completes.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What you are saying is applicable for *BaseRichBolt. *The
>>>>>>>>>>>>>> Kafka spout I am using is from storm-kafka-client
>>>>>>>>>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1>
>>>>>>>>>>>>>> library, so unique ID, etc should be already taken care of.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>>>> I think there is issue with part of code which is emitting
>>>>>>>>>>>>>>> the tuples.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If you want to use ack mechanism, you need to use anchor
>>>>>>>>>>>>>>> tuple when you emit from bolts.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Collector.emit(Tuple input, Values data)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also make sure Kafka spout emits tuple with a unique id.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Ravi
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <
>>>>>>>>>>>>>>> [email protected] wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hey,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your reply. What you are saying is correct,
>>>>>>>>>>>>>>>> However I am able to reproduce it more often and I think it 
>>>>>>>>>>>>>>>> happens when
>>>>>>>>>>>>>>>> multiple tuples gets failed in first run but all of those gets 
>>>>>>>>>>>>>>>> success on
>>>>>>>>>>>>>>>> retry, something of that sort is happening.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This can be reproduced using following two bolts and
>>>>>>>>>>>>>>>> kafkaSpout easily, by running in cluster more with 3/4 minutes:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *BoltA*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> case class Abc(index: Int, rand: Boolean)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> class BoltA  extends BaseBasicBolt {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>   override def execute(input: Tuple, collector: 
>>>>>>>>>>>>>>>> BasicOutputCollector): Unit = {
>>>>>>>>>>>>>>>>     val inp = input.getBinaryByField("value").getObj[someObj]
>>>>>>>>>>>>>>>>     val randomGenerator = new Random()
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>     var i = 0
>>>>>>>>>>>>>>>>     val rand = randomGenerator.nextBoolean()
>>>>>>>>>>>>>>>>     1 to 100 foreach {
>>>>>>>>>>>>>>>>       collector.emit(new Values(Abc(i, rand).getJsonBytes))
>>>>>>>>>>>>>>>>       i += 1
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>   override def declareOutputFields(declarer: 
>>>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>>>>     declarer.declare(new Fields("boltAout"))
>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *BoltB*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> class BoltB  extends BaseBasicBolt {
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>   override def execute(input: Tuple, collector: 
>>>>>>>>>>>>>>>> BasicOutputCollector): Unit = {
>>>>>>>>>>>>>>>>     val abc = input.getBinaryByField("boltAout").getObj[Abc]
>>>>>>>>>>>>>>>>     println(s"Received ${abc.index}th tuple in BoltB")
>>>>>>>>>>>>>>>>     if(abc.index >= 97 && abc.rand){
>>>>>>>>>>>>>>>>       println(s"throwing FailedException for ${abc.index}th 
>>>>>>>>>>>>>>>> tuple for")
>>>>>>>>>>>>>>>>       throw new FailedException()
>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>   override def declareOutputFields(declarer: 
>>>>>>>>>>>>>>>> OutputFieldsDeclarer): Unit = {
>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *KafkaSpout:*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> private def getKafkaSpoutConfig(source: Config) = 
>>>>>>>>>>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list",
>>>>>>>>>>>>>>>>  "queueName")
>>>>>>>>>>>>>>>>     .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
>>>>>>>>>>>>>>>>     .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
>>>>>>>>>>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer")
>>>>>>>>>>>>>>>>     .setOffsetCommitPeriodMs(100)
>>>>>>>>>>>>>>>>     .setRetry(new KafkaSpoutRetryExponentialBackoff(
>>>>>>>>>>>>>>>>       
>>>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>>>>       
>>>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
>>>>>>>>>>>>>>>>       10,
>>>>>>>>>>>>>>>>       
>>>>>>>>>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
>>>>>>>>>>>>>>>>     ))
>>>>>>>>>>>>>>>>     
>>>>>>>>>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy",
>>>>>>>>>>>>>>>>  "UNCOMMITTED_EARLIEST")))
>>>>>>>>>>>>>>>>     
>>>>>>>>>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset",
>>>>>>>>>>>>>>>>  10000))
>>>>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Other config:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> messageTimeoutInSecons: 300
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Saurabh,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The tuple emitted by the spout will only be acked once all
>>>>>>>>>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 
>>>>>>>>>>>>>>>>> tuples are acked.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The error you're seeing was added as part of
>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try
>>>>>>>>>>>>>>>>> to avoid having that bug pop up again. Could you try posting 
>>>>>>>>>>>>>>>>> your spout
>>>>>>>>>>>>>>>>> configuration? Also if possible, it would be helpful if you 
>>>>>>>>>>>>>>>>> could enable
>>>>>>>>>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout
>>>>>>>>>>>>>>>>> and maybe also 
>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.internal.OffsetManager.
>>>>>>>>>>>>>>>>> They log when offsets are committed (e.g.
>>>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546),
>>>>>>>>>>>>>>>>> and also when the consumer position is changed (e.g.
>>>>>>>>>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561
>>>>>>>>>>>>>>>>> ). It should be possible to track down when/why the consumer 
>>>>>>>>>>>>>>>>> position wound
>>>>>>>>>>>>>>>>> up behind the committed offset.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Just so you're aware, the check that crashes the spout has
>>>>>>>>>>>>>>>>> been removed as of
>>>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/STORM-3102. I'd
>>>>>>>>>>>>>>>>> still like to know if there's a bug in the spout causing it 
>>>>>>>>>>>>>>>>> to emit tuples
>>>>>>>>>>>>>>>>> that were already committed though.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani <
>>>>>>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Version Info:
>>>>>>>>>>>>>>>>>>    "org.apache.storm" % "storm-core" % "1.2.1"
>>>>>>>>>>>>>>>>>>    "org.apache.storm" % "storm-kafka-client" % "1.2.1"
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have a storm topology which looks like following:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> boltA -> boltB -> boltC -> boltD
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> boltA just does some formatting of requests and emits
>>>>>>>>>>>>>>>>>> another tuple. boltB does some processing and emits
>>>>>>>>>>>>>>>>>> around 100 tuples for each tuple being received. boltC
>>>>>>>>>>>>>>>>>>  and boltD processes these tuples. All the bolts
>>>>>>>>>>>>>>>>>> implements BaseBasicBolt.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as
>>>>>>>>>>>>>>>>>> fail and marks that for retry by throwing FailedException,
>>>>>>>>>>>>>>>>>> After a few minutes less than my topology timeout, I get the 
>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>> error:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
>>>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a 
>>>>>>>>>>>>>>>>>> message that has already been committed. This should never 
>>>>>>>>>>>>>>>>>> occur when using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
>>>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
>>>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>>>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>>>>>>>>>>>>>>>>>>  ~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>>>>>>>>>>>>>>>>>>  [storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) 
>>>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>>>>>>>>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR]
>>>>>>>>>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a 
>>>>>>>>>>>>>>>>>> message that has already been committed. This should never 
>>>>>>>>>>>>>>>>>> occur when using the at-least-once processing guarantee.
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471)
>>>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440)
>>>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308)
>>>>>>>>>>>>>>>>>>  ~[stormjar.jar:?]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654)
>>>>>>>>>>>>>>>>>>  ~[storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>>         at 
>>>>>>>>>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
>>>>>>>>>>>>>>>>>>  [storm-core-1.2.1.jar:1.2.1]
>>>>>>>>>>>>>>>>>>         at clojure.lang.AFn.run(AFn.java:22) 
>>>>>>>>>>>>>>>>>> [clojure-1.7.0.jar:?]
>>>>>>>>>>>>>>>>>>         at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What seems to be happening is this happens when boltB emits
>>>>>>>>>>>>>>>>>> 100 out of 1 tuple and boltDfails one of the tuples out
>>>>>>>>>>>>>>>>>> of those 100 tuples, I am getting this error. Not able to 
>>>>>>>>>>>>>>>>>> understand how to
>>>>>>>>>>>>>>>>>> fix this, ideally it should ack an original tuple when
>>>>>>>>>>>>>>>>>> all 100 tuples are acked, but probably an original tuple is 
>>>>>>>>>>>>>>>>>> acked before
>>>>>>>>>>>>>>>>>> all those 100 tuples are acked, which causes this error.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Saurabh Kumar Mimani
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> *Peter Chamberlain* | Senior Software Engineer | HTK
>>>>>>>>>>>>
>>>>>>>>>>>> T: +44(0)870 600 2311
>>>>>>>>>>>> Connect with me: Email <[email protected]>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [image: htk logo] <http://www.htk.co.uk/>
>>>>>>>>>>>>
>>>>>>>>>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn
>>>>>>>>>>>> <http://www.linkedin.com/company/htk/> | Twitter
>>>>>>>>>>>> <http://www.twitter.com/htkhorizon>
>>>>>>>>>>>>
>>>>>>>>>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS,
>>>>>>>>>>>> UK.
>>>>>>>>>>>> Company Registered in England and Wales as 3191677, VAT Number
>>>>>>>>>>>> 675 9467 71
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL.
>>>>>>>>>>>> This email is only for the use of the intended recipients and
>>>>>>>>>>>> may contain privileged information. If you’ve received this email 
>>>>>>>>>>>> in error,
>>>>>>>>>>>> please let the sender know; then delete the message. The views
>>>>>>>>>>>> expressed in this email represent those of the sender and not 
>>>>>>>>>>>> necessarily
>>>>>>>>>>>> of HTK.
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to