Worked it out. Removing a for loop in the spout wrapped around the emit fixed it.
Any ideas why this makes a different? > On 18 Aug 2015, at 06:12, Abhishek Agarwal <[email protected]> wrote: > > Couple of questions - > 1. Are you adding the tuples to pendingTuple list before emitting them in the > list? Since I didn't see that in the code. > 2. Is logging correctly configured? Can you use sysout instead of log.info > <http://log.info/> and then try out. > > On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <[email protected] > <mailto:[email protected]>> wrote: > Set to 23 the same number as the workers are set to. > > thanks >> On 17 Aug 2015, at 23:04, Javier Gonzalez <[email protected] >> <mailto:[email protected]>> wrote: >> >> How many ackers have you got configured when you submit your topology? >> >> On Aug 17, 2015 5:57 PM, "Stuart Perks" <[email protected] >> <mailto:[email protected]>> wrote: >> Hi I am attempting to run guaranteed message processing but ACK is not being >> called. Post on stack overflow if you prefer answer there. >> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working >> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working> >> >> >> Thanks >> >> >> 0 >> down vote >> <>favorite >> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#> >> I am trying to implement the guaranteed message processing but the ack or >> fail methods on the Spout are not being called. >> >> I am passing the a message ID object with the spout. I am passing the tuple >> with each bolt and calling collector.ack(tuple) in each bolt. >> >> Question The ack or fail is not being called and I cannot work out why? >> >> Here is a shortened code sample. >> >> Spout Code using BaseRichSpout >> >> public void nextTuple() { >> >> .... further code .... >> >> String msgID = UUID.randomUUID().toString() >> + System.currentTimeMillis(); >> >> Values value = new Values(splitUsage[0], splitUsage[1], >> splitUsage[2], msgID); >> outputCollector.emit(value, msgID); >> >> } >> >> @Override >> public void ack(Object msgId) { >> this.pendingTuples.remove(msgId); >> LOG.info("Ack " + msgId); >> } >> >> @Override >> public void fail(Object msgId) { >> // Re-emit the tuple >> LOG.info("Fail " + msgId); >> this.outputCollector.emit(this.pendingTuples.get(msgId), msgId); >> } >> Bolt Code using BaseRichBolt >> >> @Override >> public void execute(Tuple inputTuple) { >> >> this.outputCollector.emit(inputTuple, new Values(serverData, msgId)); >> >> this.outputCollector.ack(inputTuple); >> } >> Final Bolt >> >> @Override >> public void execute(Tuple inputTuple) { >> ..... Simply reports does not emit ..... >> this.outputCollector.ack(inputTuple); >> } >> >> > > > > > -- > Regards, > Abhishek Agarwal >
