I believe I have a fix, your logs were helpful. Please try out the changes in https://github.com/apache/storm/pull/2923/files.
Den lør. 8. dec. 2018 kl. 07.25 skrev saurabh mimani < [email protected]>: > Hey, > > Thanks for looking into this. I was not able to produce this earlier on my > local, however I will again try once. I was consistently able to reproduce > it with parallelism of 5 for boltA and parallelism of 200 with boltB on 2 > machines in cluster mode. > > I will try again with your code once. > > These <https://gist.github.com/mimani/56dd31db34e4356b25c796d78261f7b8> are > logs of Kafka Spout, when I was able to reproduce it in cluster mode with > my topology, in case these helps. > > > > Best Regards > > Saurabh Kumar Mimani > > > > > On Wed, Dec 5, 2018 at 11:33 PM Stig Rohde Døssing <[email protected]> > wrote: > >> I can't reproduce this. >> >> I created a test topology similar to the code you posted, based on the >> 1.2.1 release tag >> https://github.com/srdo/storm/commit/f5577f7a773f3d433b90a2670de5329b396f5564 >> . >> >> I set up a local Kafka instance and put enough messages to run the >> topology for 15 minutes or so in the test topic. After populating the >> topic, I started the topology and let it run until it reached the end of >> the topic. As expected a lot of messages failed, but after a while it >> managed to successfully process all messages. I didn't see any worker >> crashes, and the logs only show some errors related to moving files >> (expected on Windows). >> >> The topology seems to work fine against both Kafka 0.10.2.2 and 1.1.1, >> though 1.1.1 is slower due to >> https://issues.apache.org/jira/browse/STORM-3102. >> >> The Kafka setup was with the default configuration for both Kafka and >> Zookeeper, and Storm was set up with a local Nimbus, single local >> Supervisor and 4 worker slots. >> >> Saurabh please try to reproduce the issue using the topology I linked. If >> you need to make adjustments in order to provoke the issue, please update >> the code and link it so I can check it out and try it. >> >> Den ons. 5. dec. 2018 kl. 16.42 skrev saurabh mimani < >> [email protected]>: >> >>> No, I have checked that, there is no other consumer group consuming from >>> the same. >>> >>> Thanks for looking into it, let me know if you need any >>> other information. >>> >>> >>> >>> Best Regards >>> >>> Saurabh Kumar Mimani >>> >>> >>> >>> >>> On Wed, Dec 5, 2018 at 9:02 PM Stig Rohde Døssing < >>> [email protected]> wrote: >>> >>>> Ravi, BaseBasicBolt does automatically anchor any emitted tuples to the >>>> input tuple. It's intended for bolts that just need to receive a tuple, >>>> synchronously process it and emit some new tuples anchored to the input >>>> tuple. It's there because doing manual acking is tedious and error-prone in >>>> cases where you don't need to be able to e.g. emit new unachored tuples or >>>> ack the input tuple asynchronously. As Peter mentioned, the >>>> BasicBoltExecutor ( >>>> https://github.com/apache/storm/blob/21bb1388414d373572779289edc785c7e5aa52aa/storm-client/src/jvm/org/apache/storm/topology/BasicBoltExecutor.java#L42) >>>> handles acking for you. >>>> >>>> Saurabh, I'll see if I can reproduce your issue. Please also check that >>>> you don't have any other consumers using the same consumer group as the >>>> spout. >>>> >>>> Den ons. 5. dec. 2018 kl. 11.53 skrev Peter Chamberlain < >>>> [email protected]>: >>>> >>>>> Pretty sure that the ack path is handled by BasicBoltExecutor (an >>>>> implmentation of IRichBolt), which calls collector.setContext(input), and >>>>> also does the acking inside it's execute function, and in between calls >>>>> the >>>>> BaseBasicBolt.execute version (which takes the collector as well as the >>>>> tuple as parameters). >>>>> So the intention is clearly that it is automatically anchored and >>>>> acknowledged. >>>>> >>>>> On Wed, 5 Dec 2018 at 09:57, Ravi Sharma <[email protected]> wrote: >>>>> >>>>>> Hi Saurabh, >>>>>> I checked the BaseBasicBolt which comes with storm, it doesn't do >>>>>> much. >>>>>> Also checked BasicOutputCollector and don't see how it will anchor >>>>>> automatically unless you call BasicOutputCollector.setContext(Tuple >>>>>> tuple), don't see all of your code, but don't see this call in your boltA >>>>>> code. >>>>>> Also it looks like even when you make this setContext call, after >>>>>> that you will have to emit using following emit function >>>>>> >>>>>> BasicOutputCollector.emit(String streamId, List<Object> tuples) >>>>>> >>>>>> Basically just check that whatever emit function is called, it does >>>>>> pass the input tuple. >>>>>> >>>>>> >>>>>> Once I had exactly same issue for few days, but mine was related to >>>>>> config. I wanted to read from two Kafka topics in one topology and had >>>>>> two >>>>>> different kafkaspout created, mistakenly I copy pasted same config and >>>>>> that >>>>>> caused this issue. Not sure if that applies to your scenario. >>>>>> >>>>>> >>>>>> *NOTE*: I checked the latest storm master branch for code. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Wed, 5 Dec 2018, 08:11 saurabh mimani <[email protected] >>>>>> wrote: >>>>>> >>>>>>> Hey Ravi, >>>>>>> >>>>>>> I am using *BaseBasicBolt*, which as described here >>>>>>> <http://storm.apache.org/releases/1.0.6/Guaranteeing-message-processing.html> >>>>>>> : Tuples emitted to BasicOutputCollector are automatically anchored >>>>>>> to the input tuple, and the input tuple is acked for you automatically >>>>>>> when >>>>>>> the execute method completes. >>>>>>> >>>>>>> What you are saying is applicable for *BaseRichBolt. *The Kafka >>>>>>> spout I am using is from storm-kafka-client >>>>>>> <https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client/1.2.1> >>>>>>> library, so unique ID, etc should be already taken care of. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Best Regards >>>>>>> >>>>>>> Saurabh Kumar Mimani >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Dec 5, 2018 at 12:52 PM Ravi Sharma <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Saurabh, >>>>>>>> I think there is issue with part of code which is emitting the >>>>>>>> tuples. >>>>>>>> >>>>>>>> If you want to use ack mechanism, you need to use anchor tuple when >>>>>>>> you emit from bolts. >>>>>>>> >>>>>>>> Collector.emit(Tuple input, Values data) >>>>>>>> >>>>>>>> Also make sure Kafka spout emits tuple with a unique id. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Ravi >>>>>>>> >>>>>>>> >>>>>>>> On Wed, 5 Dec 2018, 06:35 saurabh mimani <[email protected] >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey, >>>>>>>>> >>>>>>>>> Thanks for your reply. What you are saying is correct, However I >>>>>>>>> am able to reproduce it more often and I think it happens when >>>>>>>>> multiple >>>>>>>>> tuples gets failed in first run but all of those gets success on >>>>>>>>> retry, >>>>>>>>> something of that sort is happening. >>>>>>>>> >>>>>>>>> This can be reproduced using following two bolts and kafkaSpout >>>>>>>>> easily, by running in cluster more with 3/4 minutes: >>>>>>>>> >>>>>>>>> *BoltA* >>>>>>>>> >>>>>>>>> case class Abc(index: Int, rand: Boolean) >>>>>>>>> >>>>>>>>> class BoltA extends BaseBasicBolt { >>>>>>>>> >>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>> val inp = input.getBinaryByField("value").getObj[someObj] >>>>>>>>> val randomGenerator = new Random() >>>>>>>>> >>>>>>>>> var i = 0 >>>>>>>>> val rand = randomGenerator.nextBoolean() >>>>>>>>> 1 to 100 foreach { >>>>>>>>> collector.emit(new Values(Abc(i, rand).getJsonBytes)) >>>>>>>>> i += 1 >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): >>>>>>>>> Unit = { >>>>>>>>> declarer.declare(new Fields("boltAout")) >>>>>>>>> } >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> *BoltB* >>>>>>>>> >>>>>>>>> class BoltB extends BaseBasicBolt { >>>>>>>>> >>>>>>>>> override def execute(input: Tuple, collector: >>>>>>>>> BasicOutputCollector): Unit = { >>>>>>>>> val abc = input.getBinaryByField("boltAout").getObj[Abc] >>>>>>>>> println(s"Received ${abc.index}th tuple in BoltB") >>>>>>>>> if(abc.index >= 97 && abc.rand){ >>>>>>>>> println(s"throwing FailedException for ${abc.index}th tuple >>>>>>>>> for") >>>>>>>>> throw new FailedException() >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> override def declareOutputFields(declarer: OutputFieldsDeclarer): >>>>>>>>> Unit = { >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> *KafkaSpout:* >>>>>>>>> >>>>>>>>> private def getKafkaSpoutConfig(source: Config) = >>>>>>>>> KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", >>>>>>>>> "queueName") >>>>>>>>> .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp") >>>>>>>>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>>>>>>>> "org.apache.kafka.common.serialization.ByteArrayDeserializer") >>>>>>>>> .setOffsetCommitPeriodMs(100) >>>>>>>>> .setRetry(new KafkaSpoutRetryExponentialBackoff( >>>>>>>>> >>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>> >>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100), >>>>>>>>> 10, >>>>>>>>> >>>>>>>>> KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000) >>>>>>>>> )) >>>>>>>>> >>>>>>>>> .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", >>>>>>>>> "UNCOMMITTED_EARLIEST"))) >>>>>>>>> >>>>>>>>> .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", >>>>>>>>> 10000)) >>>>>>>>> .build() >>>>>>>>> >>>>>>>>> Other config: >>>>>>>>> >>>>>>>>> messageTimeoutInSecons: 300 >>>>>>>>> >>>>>>>>> [image: Screenshot 2018-12-05 at 12.03.08 PM.png] >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Saurabh Kumar Mimani >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Dec 3, 2018 at 9:18 PM Stig Rohde Døssing < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Saurabh, >>>>>>>>>> >>>>>>>>>> The tuple emitted by the spout will only be acked once all >>>>>>>>>> branches of the tuple tree have been acked, i.e. all 100 tuples are >>>>>>>>>> acked. >>>>>>>>>> >>>>>>>>>> The error you're seeing was added as part of >>>>>>>>>> https://issues.apache.org/jira/browse/STORM-2666 to try to avoid >>>>>>>>>> having that bug pop up again. Could you try posting your spout >>>>>>>>>> configuration? Also if possible, it would be helpful if you could >>>>>>>>>> enable >>>>>>>>>> debug logging for org.apache.storm.kafka.spout.KafkaSpout and >>>>>>>>>> maybe also org.apache.storm.kafka.spout.internal.OffsetManager. >>>>>>>>>> They log when offsets are committed (e.g. >>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L546), >>>>>>>>>> and also when the consumer position is changed (e.g. >>>>>>>>>> https://github.com/apache/storm/blob/v1.2.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L561 >>>>>>>>>> ). It should be possible to track down when/why the consumer >>>>>>>>>> position wound >>>>>>>>>> up behind the committed offset. >>>>>>>>>> >>>>>>>>>> Just so you're aware, the check that crashes the spout has been >>>>>>>>>> removed as of https://issues.apache.org/jira/browse/STORM-3102. >>>>>>>>>> I'd still like to know if there's a bug in the spout causing it to >>>>>>>>>> emit >>>>>>>>>> tuples that were already committed though. >>>>>>>>>> >>>>>>>>>> Den man. 3. dec. 2018 kl. 11.29 skrev saurabh mimani < >>>>>>>>>> [email protected]>: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Version Info: >>>>>>>>>>> "org.apache.storm" % "storm-core" % "1.2.1" >>>>>>>>>>> "org.apache.storm" % "storm-kafka-client" % "1.2.1" >>>>>>>>>>> >>>>>>>>>>> I have a storm topology which looks like following: >>>>>>>>>>> >>>>>>>>>>> boltA -> boltB -> boltC -> boltD >>>>>>>>>>> >>>>>>>>>>> boltA just does some formatting of requests and emits another >>>>>>>>>>> tuple. boltB does some processing and emits around 100 tuples >>>>>>>>>>> for each tuple being received. boltC and boltD processes these >>>>>>>>>>> tuples. All the bolts implements BaseBasicBolt. >>>>>>>>>>> >>>>>>>>>>> What I am noticing is whenever boltD marks some tuple as fail >>>>>>>>>>> and marks that for retry by throwing FailedException, After a >>>>>>>>>>> few minutes less than my topology timeout, I get the following >>>>>>>>>>> error: >>>>>>>>>>> >>>>>>>>>>> 2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died! >>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message that >>>>>>>>>>> has already been committed. This should never occur when using the >>>>>>>>>>> at-least-once processing guarantee. >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>> 2018-11-30T20:01:05.262+05:30 executor [ERROR] >>>>>>>>>>> java.lang.IllegalStateException: Attempting to emit a message that >>>>>>>>>>> has already been committed. This should never occur when using the >>>>>>>>>>> at-least-once processing guarantee. >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) >>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) >>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) >>>>>>>>>>> ~[stormjar.jar:?] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) >>>>>>>>>>> ~[storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>> at >>>>>>>>>>> org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) >>>>>>>>>>> [storm-core-1.2.1.jar:1.2.1] >>>>>>>>>>> at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] >>>>>>>>>>> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] >>>>>>>>>>> >>>>>>>>>>> What seems to be happening is this happens when boltB emits 100 >>>>>>>>>>> out of 1 tuple and boltDfails one of the tuples out of those >>>>>>>>>>> 100 tuples, I am getting this error. Not able to understand how to >>>>>>>>>>> fix >>>>>>>>>>> this, ideally it should ack an original tuple when all 100 >>>>>>>>>>> tuples are acked, but probably an original tuple is acked before >>>>>>>>>>> all those >>>>>>>>>>> 100 tuples are acked, which causes this error. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> Saurabh Kumar Mimani >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>> >>>>> -- >>>>> >>>>> >>>>> >>>>> *Peter Chamberlain* | Senior Software Engineer | HTK >>>>> >>>>> T: +44(0)870 600 2311 >>>>> Connect with me: Email <[email protected]> >>>>> >>>>> >>>>> [image: htk logo] <http://www.htk.co.uk/> >>>>> >>>>> Connect with HTK: htk.co.uk <http://www.htk.co.uk/> | LinkedIn >>>>> <http://www.linkedin.com/company/htk/> | Twitter >>>>> <http://www.twitter.com/htkhorizon> >>>>> >>>>> HTK Limited, Chapmans Warehouse, Wherry Quay, Ipswich, IP4 1AS, UK. >>>>> Company Registered in England and Wales as 3191677, VAT Number 675 >>>>> 9467 71 >>>>> >>>>> >>>>> PLEASE CONSIDER THE ENVIRONMENT BEFORE PRINTING THIS EMAIL. >>>>> This email is only for the use of the intended recipients and may >>>>> contain privileged information. If you’ve received this email in error, >>>>> please let the sender know; then delete the message. The views >>>>> expressed in this email represent those of the sender and not necessarily >>>>> of HTK. >>>>> >>>>
