I have following topology structure

Kafka Spout

Bolt A : Reads tuples from spout and extracts some info

*_collector.**emit**(**tuple**,* *new** Values**(.......**)**)**;*

*_collector.ack(tuple)*

*In case of exception / error *
*_collector.fail(tuple)*

Bolt B : Create files based on info extracted from bolt A and writes the
tuple message to corresponding file. Once file satisfies size or age
criteria, it closes and sends file path to next bolt

If any file operation fails that tuple is reported by
*_collector.fail(tuple)*

If tuple contents written to file* : tuple is stored in a list (anchors)
and **_collector.ack(tuple)*

*_collector.**emit**(**anchors**,* *new** Values**(.......**)**)**;*


Bolt C : Process the file

Here when the file path is received from the bolt B, it is processed and
the tuple received is ack or fail.

*_collector.ack(tuple) : acks all previous tuples*

*_collector.fail(tuple) : fails, hence replays previous tuples*

This way I am assuming if a worker fails, all tuples bolt B received fail
and will be replayed. As Bolt C has not received anchored tuple yet, let
alone it acknowledging.

If Bolt C has an anchored tuple and it did not ack before worker going down
will lead to replaying all tuples it represents.

Observation:

As a test I am pushing 2~5 GB data from file. if one of the worker is
killed, it is restarted by supervisor. But there is data loss and the
received data after Bolt C is less than actual data used for input, which
otherwise should exactly be the same.

What am I missing here ?

Reply via email to