Hi folks, I was hoping to get some advice on how to design the following use case.
My code (consumer) reads messages from Topic A and per partition (that at the moment is just 1). The consumer is single threaded per topic. After reading/dequeuing the message, I get an error when trying to deserialize it (this error is related to the way I serialize my objects using json) making my consumer unable to re-process the message (since the message was already consumed). It is not a Kafka-related issue but made me realize the fact that I can lose messages. Ideally I would live to avoid "commiting" to the broker that the message has been consumed, wait until the message is processed successfully by my consumer and once I make sure I properly processed the message, then send the acknowledge to the broker indicating that this message can be discarded. In case of an error, the broker should be able to re-send the same message. What would be the way to achieve this? I see that MessageStream has a method called "commitOffset" but It doesn't seem to apply to a particular topic. Am I approaching the problem in the wrong direction ? Thanks Patricio