Ash, I have tried ack with storm-0.9.2 incubating. I acknowledged the tuple in all intermediate Bolts too.
E.g topology: Spout1 -> Bolt1 -> Bolt2 1)Spout1 emitted a anchored tuple. execute method : *collector.emit(new Values(emitWord), msgId);* 2)Bolt2 and Bolt3 emitted a normal tuple, but acknowledged it. execute method : *collector.emit(new Values(emitWord));//Unanchored tuple;this.collector.ack(tuple);* -Binita On Wed, Oct 15, 2014 at 3:50 AM, Ash G <[email protected]> wrote: > Storm keeps re-reading the same data from Kafka topic again and again. > > I have kafka spout and series of bolts and I am anchoring the tuple. In > last bolt. I do _collector.ack(tuple). > > I am using Kafka Storm Spout bundled with Storm 0.9.2 distribution. All > Bolts are extending BaseRichBolt. > > I checked the messageIds and I see as tuple moves through the bolts, > following messageIds are displayed. Ie. > > {-210221166871835114=-8820821848891311855} > > {-210221166871835114=5058142667727028101} > > {-210221166871835114=-3068891797148595604} > > Also i have > > zkRoot = "/brokers"; //Zkroot will be used as root to store your > consumer's offset > > Looks like offsets are not being stored properly by kafka spout on Ack. > This problem does not happen when numAckers = 0 ie. Ack is disabled. > > Any suggestion or clues on where can I look? >
