Couple of questions -
1. Are you adding the tuples to pendingTuple list before emitting them in
the list? Since I didn't see that in the code.
2. Is logging correctly configured? Can you use sysout instead of log.info
and then try out.

On Tue, Aug 18, 2015 at 4:02 AM, Stuart Perks <[email protected]> wrote:

> Set to 23 the same number as the workers are set to.
>
> thanks
>
> On 17 Aug 2015, at 23:04, Javier Gonzalez <[email protected]> wrote:
>
> How many ackers have you got configured when you submit your topology?
> On Aug 17, 2015 5:57 PM, "Stuart Perks" <[email protected]> wrote:
>
>> Hi I am attempting to run guaranteed message processing but ACK is not
>> being called. Post on stack overflow if you prefer answer there.
>> http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working
>>
>>
>> Thanks
>>
>>
>> 0down votefavorite
>> <http://stackoverflow.com/questions/32060081/apache-storm-ack-not-working#>
>>
>> I am trying to implement the guaranteed message processing but the ack or
>> fail methods on the Spout are not being called.
>>
>> I am passing the a message ID object with the spout. I am passing the
>> tuple with each bolt and calling collector.ack(tuple) in each bolt.
>>
>> *Question* The ack or fail is not being called and I cannot work out why?
>>
>> Here is a shortened code sample.
>>
>> *Spout Code using BaseRichSpout*
>>
>> public void nextTuple() {
>>
>>     .... further code ....
>>
>>     String msgID = UUID.randomUUID().toString()
>>                     + System.currentTimeMillis();
>>
>>     Values value = new Values(splitUsage[0], splitUsage[1],
>>                     splitUsage[2], msgID);
>>     outputCollector.emit(value, msgID);
>> }
>> @Overridepublic void ack(Object msgId) {
>>     this.pendingTuples.remove(msgId);
>>     LOG.info("Ack " + msgId);}
>> @Overridepublic void fail(Object msgId) {
>>     // Re-emit the tuple
>>     LOG.info("Fail " + msgId);
>>     this.outputCollector.emit(this.pendingTuples.get(msgId), msgId);}
>>
>> *Bolt Code using BaseRichBolt*
>>
>> @Overridepublic void execute(Tuple inputTuple) {
>> this.outputCollector.emit(inputTuple, new Values(serverData, msgId));
>> this.outputCollector.ack(inputTuple);}
>>
>> *Final Bolt*
>>
>> @Overridepublic void execute(Tuple inputTuple) {
>>   ..... Simply reports does not emit .....
>>   this.outputCollector.ack(inputTuple);
>>
>> }
>>
>>
>


-- 
Regards,
Abhishek Agarwal

Reply via email to