Couple of questions - 1. Are you adding the tuples to pendingTuple list before emitting them in the list? Since I didn't see that in the code. 2. Is logging correctly configured? Can you use sysout instead of log.info and then try out.
On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <[email protected]> wrote: > Set to 23 the same number as the workers are set to. > > thanks > > On 17 Aug 2015, at 23:04, Javier Gonzalez <[email protected]> wrote: > > How many ackers have you got configured when you submit your topology? > On Aug 17, 2015 5:57 PM, "Stuart Perks" <[email protected]> wrote: > >> Hi I am attempting to run guaranteed message processing but ACK is not >> being called. Post on stack overflow if you prefer answer there. >> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working >> >> >> Thanks >> >> >> 0down votefavorite >> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#> >> >> I am trying to implement the guaranteed message processing but the ack or >> fail methods on the Spout are not being called. >> >> I am passing the a message ID object with the spout. I am passing the >> tuple with each bolt and calling collector.ack(tuple) in each bolt. >> >> *Question* The ack or fail is not being called and I cannot work out why? >> >> Here is a shortened code sample. >> >> *Spout Code using BaseRichSpout* >> >> public void nextTuple() { >> >> .... further code .... >> >> String msgID = UUID.randomUUID().toString() >> + System.currentTimeMillis(); >> >> Values value = new Values(splitUsage[0], splitUsage[1], >> splitUsage[2], msgID); >> outputCollector.emit(value, msgID); >> } >> @Overridepublic void ack(Object msgId) { >> this.pendingTuples.remove(msgId); >> LOG.info("Ack " + msgId);} >> @Overridepublic void fail(Object msgId) { >> // Re-emit the tuple >> LOG.info("Fail " + msgId); >> this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);} >> >> *Bolt Code using BaseRichBolt* >> >> @Overridepublic void execute(Tuple inputTuple) { >> this.outputCollector.emit(inputTuple, new Values(serverData, msgId)); >> this.outputCollector.ack(inputTuple);} >> >> *Final Bolt* >> >> @Overridepublic void execute(Tuple inputTuple) { >> ..... Simply reports does not emit ..... >> this.outputCollector.ack(inputTuple); >> >> } >> >> > -- Regards, Abhishek Agarwal
