Thanks for the question. I think Gary provided an excellent answer. Additionally, you could check out the code example <https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java#L132> for EOS, which shows you how to reset the state while aborting ongoing transactions.
On Thu, Feb 18, 2021 at 11:01 AM Gary Russell <gruss...@vmware.com> wrote: > You have to perform seeks (using the consumer) to the lowest unprocessed > offset for each partition returned by the poll, before the next poll. > ________________________________ > From: Peter Cipov <pci...@twilio.com.INVALID> > Sent: Thursday, February 18, 2021 1:20 PM > To: users@kafka.apache.org <users@kafka.apache.org> > Subject: Abort transaction semantics > > Hello > I have a question regarding aborting transactions in kafka client 2.4.1. > > lets have following code : > > ... propper transaction producer consumer creation, consumer autocommit = > false > > producer.transactionInit(); > > while(true) { > records = consumer.poll(); > logRecordOffsets(records) > producer.beginTransaction() > try { > doMagic() > } catch{ > producer.AbortTransaction(); > continue; > } > producer.sendOffsets(..); > producer.commitTransaction() > } > > When doMagic crashes for some reason, abort is called and code will start > from beginning with doing poll. > > Our assumption was that the next poll will start from the same offsets, but > as we saw from logs this is not the case. What we observed that offsets are > shifted and messages are lost, they will not be retried again. > > What is the semantics for abort, we could not figure out from > documentation. > What is the recommended approach for retrying ? > > Thank you >