Thanks Jun for your answer. Do I need to turn autocommit off on the consumer? Since I'll be doing it manually.
Sent from my Android On Jan 5, 2012 8:50 AM, "Jun Rao" <jun...@gmail.com> wrote: > Do you want to stop consumption when you hit a deserialization error and > fix the bug in the deserializer and reconsume the last message? If so, you > can explicitly call commitOffset periodically after successful > deserialization of the messages. If you fail and restart after the bug is > fixed, the last few messages will be replayed. Under the cover, > commitOffset commit offset of the last consumed message for each subscribed > topic. > > Jun > > > 2012/1/4 Patricio Echagüe <patric...@gmail.com> > > > Hi folks, I was hoping to get some advice on how to design the following > > use case. > > > > My code (consumer) reads messages from Topic A and per partition (that at > > the moment is just 1). The consumer is single threaded per topic. > > After reading/dequeuing the message, I get an error when trying to > > deserialize it (this error is related to the way I serialize my objects > > using json) making my consumer unable to re-process the message (since > the > > message was already consumed). It is not a Kafka-related issue but made > me > > realize the fact that I can lose messages. > > > > Ideally I would live to avoid "commiting" to the broker that the message > > has been consumed, wait until the message is processed successfully by my > > consumer and once I make sure I properly processed the message, then send > > the acknowledge to the broker indicating that this message can be > > discarded. > > > > In case of an error, the broker should be able to re-send the same > message. > > > > What would be the way to achieve this? > > > > I see that MessageStream has a method called "commitOffset" but It > doesn't > > seem to apply to a particular topic. > > > > Am I approaching the problem in the wrong direction ? > > > > Thanks > > Patricio > > >