Thank You,

"Also you need to  ack the tuple from intermediate bolts.  "

This worked. I was not aware we needed to ack from  from intermediate
bolts.

Earlier, I was anchoring in all bolts and doing ack in only the final bolt.

Ash


On Tue, Oct 14, 2014 at 6:23 PM, Harsha <[email protected]> wrote:

>  Ash,
>     you need to anchor the tuple in the intermediate bolts as well. Please
> take  a look at the docs here
>
> https://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html
>
> "java _collector.emit(new Values(word));
>
> Emitting the word tuple this way causes it to be *unanchored*. If the
> tuple fails be processed downstream, the root tuple will not be replayed.
> Depending on the fault-tolerance guarantees you need in your topology,
> sometimes it’s appropriate to emit an unanchored tuple."
> Also you need to  ack the tuple from intermediate bolts.
> -Harsha
>
>
> On Tue, Oct 14, 2014, at 03:20 PM, Ash G wrote:
>
> Storm keeps re-reading the same data from Kafka topic again and again.
>
> I have kafka spout and series of bolts and I am anchoring the tuple. In
> last bolt. I do _collector.ack(tuple).
>
> I am using Kafka Storm Spout bundled with Storm 0.9.2 distribution. All
> Bolts are extending BaseRichBolt.
>
> I checked the messageIds and I see as tuple moves through the bolts,
> following messageIds are displayed. Ie.
>
> {-210221166871835114=-8820821848891311855}
>
>
> {-210221166871835114=5058142667727028101}
>
>
> {-210221166871835114=-3068891797148595604}
>
> Also i have
>
> zkRoot = "/brokers"; //Zkroot will be used as root to store your
> consumer's offset
>
> Looks like offsets are not being stored properly by kafka spout on Ack.
> This problem does not happen when numAckers = 0 ie. Ack is disabled.
>
> Any suggestion or clues on where can I look?
>
>
>

Reply via email to