Thank You, "Also you need to ack the tuple from intermediate bolts. "
This worked. I was not aware we needed to ack from from intermediate bolts. Earlier, I was anchoring in all bolts and doing ack in only the final bolt. Ash On Tue, Oct 14, 2014 at 6:23 PM, Harsha <[email protected]> wrote: > Ash, > you need to anchor the tuple in the intermediate bolts as well. Please > take a look at the docs here > > https://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html > > "java _collector.emit(new Values(word)); > > Emitting the word tuple this way causes it to be *unanchored*. If the > tuple fails be processed downstream, the root tuple will not be replayed. > Depending on the fault-tolerance guarantees you need in your topology, > sometimes it’s appropriate to emit an unanchored tuple." > Also you need to ack the tuple from intermediate bolts. > -Harsha > > > On Tue, Oct 14, 2014, at 03:20 PM, Ash G wrote: > > Storm keeps re-reading the same data from Kafka topic again and again. > > I have kafka spout and series of bolts and I am anchoring the tuple. In > last bolt. I do _collector.ack(tuple). > > I am using Kafka Storm Spout bundled with Storm 0.9.2 distribution. All > Bolts are extending BaseRichBolt. > > I checked the messageIds and I see as tuple moves through the bolts, > following messageIds are displayed. Ie. > > {-210221166871835114=-8820821848891311855} > > > {-210221166871835114=5058142667727028101} > > > {-210221166871835114=-3068891797148595604} > > Also i have > > zkRoot = "/brokers"; //Zkroot will be used as root to store your > consumer's offset > > Looks like offsets are not being stored properly by kafka spout on Ack. > This problem does not happen when numAckers = 0 ie. Ack is disabled. > > Any suggestion or clues on where can I look? > > >
