Thanks Jun. So how to I commit per topic ? I'm still missing that part. Since I have one thread per topic I have no control of the state of the other topics and I want each thread to commit as soon as they successfully consume and process each message.
Sent from my Android On Jan 5, 2012 9:23 AM, "Jun Rao" <jun...@gmail.com> wrote: > Yes, if want the full control. > > Thanks, > > Jun > > 2012/1/5 Patricio Echagüe <patric...@gmail.com> > > > Thanks Jun for your answer. > > > > Do I need to turn autocommit off on the consumer? Since I'll be doing it > > manually. > > > > Sent from my Android > > On Jan 5, 2012 8:50 AM, "Jun Rao" <jun...@gmail.com> wrote: > > > > > Do you want to stop consumption when you hit a deserialization error > and > > > fix the bug in the deserializer and reconsume the last message? If so, > > you > > > can explicitly call commitOffset periodically after successful > > > deserialization of the messages. If you fail and restart after the bug > is > > > fixed, the last few messages will be replayed. Under the cover, > > > commitOffset commit offset of the last consumed message for each > > subscribed > > > topic. > > > > > > Jun > > > > > > > > > 2012/1/4 Patricio Echagüe <patric...@gmail.com> > > > > > > > Hi folks, I was hoping to get some advice on how to design the > > following > > > > use case. > > > > > > > > My code (consumer) reads messages from Topic A and per partition > (that > > at > > > > the moment is just 1). The consumer is single threaded per topic. > > > > After reading/dequeuing the message, I get an error when trying to > > > > deserialize it (this error is related to the way I serialize my > objects > > > > using json) making my consumer unable to re-process the message > (since > > > the > > > > message was already consumed). It is not a Kafka-related issue but > made > > > me > > > > realize the fact that I can lose messages. > > > > > > > > Ideally I would live to avoid "commiting" to the broker that the > > message > > > > has been consumed, wait until the message is processed successfully > by > > my > > > > consumer and once I make sure I properly processed the message, then > > send > > > > the acknowledge to the broker indicating that this message can be > > > > discarded. > > > > > > > > In case of an error, the broker should be able to re-send the same > > > message. > > > > > > > > What would be the way to achieve this? > > > > > > > > I see that MessageStream has a method called "commitOffset" but It > > > doesn't > > > > seem to apply to a particular topic. > > > > > > > > Am I approaching the problem in the wrong direction ? > > > > > > > > Thanks > > > > Patricio > > > > > > > > > >