Hi,

In the old consumer, I have got just a simple stream of messages, one
by one and if I detected something was wrong, I would destroy my
consumer immediately without commit so once I restart consumer, I will
get the same messages once again because they will be delivered to me
from the last offset committed (if I understand that correctly).

While this can work, I have at-least-once delivery guarantee and that
is not good in my case. I need to have exactly-once guarantee.

While looking into new consumer, I noticed that there is the
possiblity to kind of "rewind" in a partition.

My new algorithm is something like this:

Partition myPartition;

consumer.subscribe(myTopic);

ConsumerRecords = consumer.poll(0);

for (Record record: ConsumerRecords) {

    processRecord(record);

    processedMessages++;

    if (failure) {
        int offsetOfLastProcessedRecord = record.offset();

        // this will effectively rewind me back so I get messages
which are not processed yet

        consumer.seek(myPartition, offsetOfLastProcessedRecord -
processedMessages);

        // here i commit the position of the lastly processed record
so on the next poll
        // i should get messages which were polled before but stayed
unprocessed because of the
        // error
        consumer.commit(map<parition, offsetOfLastProcessedRecord>,
CommitType.SYNC);
    }
}

Does this approach make sense?

-- 
Stefan Miklosovic

Reply via email to