Worked it out. 

Removing a for loop in the spout wrapped around the emit fixed it. 

Any ideas why this makes  a different? 


> On 18 Aug 2015, at 06:12, Abhishek Agarwal <[email protected]> wrote:
> 
> Couple of questions - 
> 1. Are you adding the tuples to pendingTuple list before emitting them in the 
> list? Since I didn't see that in the code.
> 2. Is logging correctly configured? Can you use sysout instead of log.info 
> <http://log.info/> and then try out. 
> 
> On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <[email protected] 
> <mailto:[email protected]>> wrote:
> Set to 23 the same number as the workers are set to. 
> 
> thanks
>> On 17 Aug 2015, at 23:04, Javier Gonzalez <[email protected] 
>> <mailto:[email protected]>> wrote:
>> 
>> How many ackers have you got configured when you submit your topology?
>> 
>> On Aug 17, 2015 5:57 PM, "Stuart Perks" <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hi I am attempting to run guaranteed message processing but ACK is not being 
>> called. Post on stack overflow if you prefer answer there. 
>> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working 
>> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working>
>> 
>> 
>> Thanks
>> 
>> 
>> 0
>> down vote
>>  <>favorite
>>  <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#> 
>> I am trying to implement the guaranteed message processing but the ack or 
>> fail methods on the Spout are not being called.
>> 
>> I am passing the a message ID object with the spout. I am passing the tuple 
>> with each bolt and calling collector.ack(tuple) in each bolt.
>> 
>> Question The ack or fail is not being called and I cannot work out why?
>> 
>> Here is a shortened code sample.
>> 
>> Spout Code using BaseRichSpout
>> 
>> public void nextTuple() {
>> 
>>     .... further code ....
>> 
>>     String msgID = UUID.randomUUID().toString()
>>                     + System.currentTimeMillis();
>> 
>>     Values value = new Values(splitUsage[0], splitUsage[1],
>>                     splitUsage[2], msgID);
>>     outputCollector.emit(value, msgID);
>> 
>> }
>> 
>> @Override
>> public void ack(Object msgId) {
>>     this.pendingTuples.remove(msgId);
>>     LOG.info("Ack " + msgId);
>> }
>> 
>> @Override
>> public void fail(Object msgId) {
>>     // Re-emit the tuple
>>     LOG.info("Fail " + msgId);
>>     this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);
>> }
>> Bolt Code using BaseRichBolt
>> 
>> @Override
>> public void execute(Tuple inputTuple) {
>> 
>> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
>> 
>> this.outputCollector.ack(inputTuple);
>> }
>> Final Bolt
>> 
>> @Override
>> public void execute(Tuple inputTuple) {
>>   ..... Simply reports does not emit .....
>>   this.outputCollector.ack(inputTuple);
>> }
>> 
>> 
> 
> 
> 
> 
> -- 
> Regards,
> Abhishek Agarwal
> 

Reply via email to