Ash,
you need to anchor the tuple in the intermediate bolts as
well. Please take a look at the docs here
[1]https://storm.incubator.apache.org/documentation/Guaranteein
g-message-processing.html
"java _collector.emit(new Values(word));
Emitting the word tuple this way causes it to be unanchored. If
the tuple fails be processed downstream, the root tuple will
not be replayed. Depending on the fault-tolerance guarantees
you need in your topology, sometimes it’s appropriate to emit
an unanchored tuple."
Also you need to ack the tuple from intermediate bolts.
-Harsha
On Tue, Oct 14, 2014, at 03:20 PM, Ash G wrote:
Storm keeps re-reading the same data from Kafka topic again and
again.
I have kafka spout and series of bolts and I am anchoring the
tuple. In last bolt. I do _collector.ack(tuple).
I am using Kafka Storm Spout bundled with Storm 0.9.2
distribution. All Bolts are extending BaseRichBolt.
I checked the messageIds and I see as tuple moves through the
bolts, following messageIds are displayed. Ie.
{-210221166871835114=-8820821848891311855}
{-210221166871835114=5058142667727028101}
{-210221166871835114=-3068891797148595604}
Also i have
zkRoot = "/brokers"; //Zkroot will be used as root to store
your consumer's offset
Looks like offsets are not being stored properly by kafka spout
on Ack. This problem does not happen when numAckers = 0 ie. Ack
is disabled.
Any suggestion or clues on where can I look?
References
1.
https://storm.incubator.apache.org/documentation/Guaranteeing-message-processing.html