Hey, Thanks for looking into this. I was not able to produce this earlier on my local, however I will again try once. I was consistently able to reproduce it with parallelism of 5 for boltA and parallelism of 200 with boltB on 2 machines in cluster mode.
I will try again with your code once. These <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are logs of Kafka Spout, when I was able to reproduce it in cluster mode with my topology, in case these helps. Best Regards Saurabh Kumar Mimani On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <[email protected]> wrote: > I can't reproduce this. > > I created a test topology similar to the code you posted, based on the > 1.2.1 release tag > https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564 > . > > I set up a local Kafka instance and put enough messages to run the > topology for 15 minutes or so in the test topic. After populating the > topic, I started the topology and let it run until it reached the end of > the topic. As expected a lot of messages failed, but after a while it > managed to successfully process all messages. I didn't see any worker > crashes, and the logs only show some errors related to moving files > (expected on Windows). > > The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1, > though 1.1.1 is slower due to > https://issues.apache.org/jira/browse/STORM-3102. > > The Kafka setup was with the default configuration for both Kafka and > Zookeeper, and Storm was set up with a local Nimbus, single local > Supervisor and 4 worker slots. > > Saurabh please try to reproduce the issue using the topology I linked. If > you need to make adjustments in order to provoke the issue, please update > the code and link it so I can check it out and try it. > > Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < > [email protected]>: > >> No, I have checked that, there is no other consumer group consuming from >> the same. >> >> Thanks for looking into it, let me know if you need any other information. >> >> >> >> Best Regards >> >> Saurabh Kumar Mimani >> >> >> >> >> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing <[email protected]> >> wrote: >> >>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to the >>> input tuple. It's intended for bolts that just need to receive a tuple, >>> synchronously process it and emit some new tuples anchored to the input >>> tuple. It's there because doing manual acking is tedious and error-prone in >>> cases where you don't need to be able to e.g. emit new unachored tuples or >>> ack the input tuple asynchronously. As Peter mentioned, the >>> BasicBoltExecutor ( >>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42) >>> handles acking for you. >>> >>> Saurabh, I'll see if I can reproduce your issue. Please also check that >>> you don't have any other consumers using the same consumer group as the >>> spout. >>> >>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>> [email protected]>: >>> >>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an >>>> implmentation of IRichBolt), which calls collector.setContext(input), and >>>> also does the acking inside it's execute function, and in between calls the >>>> BaseBasicBolt.execute version (which takes the collector as well as the >>>> tuple as parameters). >>>> So the intention is clearly that it is automatically anchored and >>>> acknowledged. >>>> >>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]> wrote: >>>> >>>>> Hi Saurabh, >>>>> I checked the BaseBasicBolt which comes with storm, it doesn't do much. >>>>> Also checked BasicOutputCollector and don't see how it will anchor >>>>> automatically unless you call BasicOutputCollector.setContext(Tuple >>>>> tuple), don't see all of your code, but don't see this call in your boltA >>>>> code. >>>>> Also it looks like even when you make this setContext call, after that >>>>> you will have to emit using following emit function >>>>> >>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples) >>>>> >>>>> Basically just check that whatever emit function is called, it does >>>>> pass the input tuple. >>>>> >>>>> >>>>> Once I had exactly same issue for few days, but mine was related to >>>>> config. I wanted to read from two Kafka topics in one topology and had two >>>>> different kafkaspout created, mistakenly I copy pasted same config and >>>>> that >>>>> caused this issue. Not sure if that applies to your scenario. >>>>> >>>>> >>>>> *NOTE*: I checked the latest storm master branch for code. >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <[email protected] >>>>> wrote: >>>>> >>>>>> Hey Ravi, >>>>>> >>>>>> I am using *BaseBasicBolt*, which as described here >>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html> >>>>>> : Tuples emitted to BasicOutputCollector are automatically anchored >>>>>> to the input tuple, and the input tuple is acked for you automatically >>>>>> when >>>>>> the execute method completes. >>>>>> >>>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka >>>>>> spout I am using is from storm-kafka-client >>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1> >>>>>> library, so unique ID, etc should be already taken care of. >>>>>> >>>>>> >>>>>> >>>>>> Best Regards >>>>>> >>>>>> Saurabh Kumar Mimani >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Saurabh, >>>>>>> I think there is issue with part of code which is emitting the >>>>>>> tuples. >>>>>>> >>>>>>> If you want to use ack mechanism, you need to use anchor tuple when >>>>>>> you emit from bolts. >>>>>>> >>>>>>> Collector.emit(Tuple input, Values data) >>>>>>> >>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>> >>>>>>> Thanks >>>>>>> Ravi >>>>>>> >>>>>>> >>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <[email protected] >>>>>>> wrote: >>>>>>> >>>>>>>> Hey, >>>>>>>> >>>>>>>> Thanks for your reply. What you are saying is correct, However I am >>>>>>>> able to reproduce it more often and I think it happens when multiple >>>>>>>> tuples >>>>>>>> gets failed in first run but all of those gets success on retry, >>>>>>>> something >>>>>>>> of that sort is happening. >>>>>>>> >>>>>>>> This can be reproduced using following two bolts and kafkaSpout >>>>>>>> easily, by running in cluster more with 3/4 minutes: >>>>>>>> >>>>>>>> *BoltA* >>>>>>>> >>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>> >>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>> >>>>>>>> override def execute(input: Tuple, collector: BasicOutputCollector): >>>>>>>> Unit = { >>>>>>>> val inp = input.getBinaryByField("value").getObj[someObj] >>>>>>>> val randomGenerator = new Random() >>>>>>>> >>>>>>>> var i = 0 >>>>>>>> val rand = randomGenerator.nextBoolean() >>>>>>>> 1 to 100 foreach { >>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>> i += 1 >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): >>>>>>>> Unit = { >>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>> } >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> *BoltB* >>>>>>>> >>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>> >>>>>>>> override def execute(input: Tuple, collector: BasicOutputCollector): >>>>>>>> Unit = { >>>>>>>> val abc = input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>> if(abc.index >= 97 && abc.rand){ >>>>>>>> println(s"throwing FailedException for ${abc.index}th tuple for") >>>>>>>> throw new FailedException() >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): >>>>>>>> Unit = { >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> *KafkaSpout:* >>>>>>>> >>>>>>>> private def getKafkaSpoutConfig(source: Config) = >>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", >>>>>>>> "queueName") >>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>> 10, >>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000) >>>>>>>> )) >>>>>>>> >>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", >>>>>>>> "UNCOMMITTED_EARLIEST"))) >>>>>>>> >>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", >>>>>>>> 10000)) >>>>>>>> .build() >>>>>>>> >>>>>>>> Other config: >>>>>>>> >>>>>>>> messageTimeoutInSecons: 300 >>>>>>>> >>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Saurabh Kumar Mimani >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Saurabh, >>>>>>>>> >>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 tuples are >>>>>>>>> acked. >>>>>>>>> >>>>>>>>> The error you're seeing was added as part of >>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to avoid >>>>>>>>> having that bug pop up again. Could you try posting your spout >>>>>>>>> configuration? Also if possible, it would be helpful if you could >>>>>>>>> enable >>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout and >>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager. >>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>> ). It should be possible to track down when/why the consumer position >>>>>>>>> wound >>>>>>>>> up behind the committed offset. >>>>>>>>> >>>>>>>>> Just so you're aware, the check that crashes the spout has been >>>>>>>>> removed as of https://issues.apache.org/jira/browse/STORM-3102. >>>>>>>>> I'd still like to know if there's a bug in the spout causing it to >>>>>>>>> emit >>>>>>>>> tuples that were already committed though. >>>>>>>>> >>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>> [email protected]>: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Version Info: >>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>> >>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>> >>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>> >>>>>>>>>> boltA just does some formatting of requests and emits another >>>>>>>>>> tuple. boltB does some processing and emits around 100 tuples >>>>>>>>>> for each tuple being received. boltC and boltD processes these >>>>>>>>>> tuples. All the bolts implements BaseBasicBolt. >>>>>>>>>> >>>>>>>>>> What I am noticing is whenever boltD marks some tuple as fail >>>>>>>>>> and marks that for retry by throwing FailedException, After a >>>>>>>>>> few minutes less than my topology timeout, I get the following error: >>>>>>>>>> >>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message that >>>>>>>>>> has already been committed. This should never occur when using the >>>>>>>>>> at-least-once processing guarantee. >>>>>>>>>> at >>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message that >>>>>>>>>> has already been committed. This should never occur when using the >>>>>>>>>> at-least-once processing guarantee. >>>>>>>>>> at >>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at >>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>> >>>>>>>>>> What seems to be happening is this happens when boltB emits 100 >>>>>>>>>> out of 1 tuple and boltDfails one of the tuples out of those 100 >>>>>>>>>> tuples, I am getting this error. Not able to understand how to fix >>>>>>>>>> this, >>>>>>>>>> ideally it should ack an original tuple when all 100 tuples are >>>>>>>>>> acked, but probably an original tuple is acked before all those 100 >>>>>>>>>> tuples >>>>>>>>>> are acked, which causes this error. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best Regards >>>>>>>>>> >>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>> >>>> -- >>>> >>>> >>>> >>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>> >>>> T: +44(0)870 600 2311 >>>> Connect with me: Email <[email protected]> >>>> >>>> >>>> [image: htk logo] <http://www.htk.co.uk/> >>>> >>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn >>>> <http://www.linkedin.com/company/htk/> | Twitter >>>> <http://www.twitter.com/htkhorizon> >>>> >>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK. >>>> Company Registered in England and Wales as 3191677, VAT Number 675 9467 >>>> 71 >>>> >>>> >>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>> This email is only for the use of the intended recipients and may >>>> contain privileged information. If you’ve received this email in error, >>>> please let the sender know; then delete the message. The views >>>> expressed in this email represent those of the sender and not necessarily >>>> of HTK. >>>> >>>
