Ash,

I have tried ack with storm-0.9.2 incubating. I acknowledged the tuple in
all intermediate Bolts too.

E.g topology:

Spout1 -> Bolt1 -> Bolt2

1)Spout1 emitted a anchored tuple.
execute method :

*collector.emit(new Values(emitWord), msgId);*

2)Bolt2 and Bolt3 emitted a normal tuple, but acknowledged it.
execute method :


*collector.emit(new Values(emitWord));//Unanchored
tuple;this.collector.ack(tuple);*

-Binita


On Wed, Oct 15, 2014 at 3:50 AM, Ash G <[email protected]> wrote:

> Storm keeps re-reading the same data from Kafka topic again and again.
>
> I have kafka spout and series of bolts and I am anchoring the tuple. In
> last bolt. I do _collector.ack(tuple).
>
> I am using Kafka Storm Spout bundled with Storm 0.9.2 distribution. All
> Bolts are extending BaseRichBolt.
>
> I checked the messageIds and I see as tuple moves through the bolts,
> following messageIds are displayed. Ie.
>
> {-210221166871835114=-8820821848891311855}
>
> {-210221166871835114=5058142667727028101}
>
> {-210221166871835114=-3068891797148595604}
>
> Also i have
>
> zkRoot = "/brokers"; //Zkroot will be used as root to store your
> consumer's offset
>
> Looks like offsets are not being stored properly by kafka spout on Ack.
> This problem does not happen when numAckers = 0 ie. Ack is disabled.
>
> Any suggestion or clues on where can I look?
>

Reply via email to