oh gotcha. That makes sense. Thanks a lot.
On Thu, Jan 5, 2012 at 9:56 AM, Jun Rao <jun...@gmail.com> wrote: > When you call commitOffset, we commit the offset for all subscribed topics. > If you want topic level control, currently, you have to use multiple > consumer connectors, one for each topic separately. > > Jun > > 2012/1/5 Patricio Echagüe <patric...@gmail.com> > > > Thanks Jun. So how to I commit per topic ? I'm still missing that part. > > Since I have one thread per topic I have no control of the state of the > > other topics and I want each thread to commit as soon as they > successfully > > consume and process each message. > > > > Sent from my Android > > On Jan 5, 2012 9:23 AM, "Jun Rao" <jun...@gmail.com> wrote: > > > > > Yes, if want the full control. > > > > > > Thanks, > > > > > > Jun > > > > > > 2012/1/5 Patricio Echagüe <patric...@gmail.com> > > > > > > > Thanks Jun for your answer. > > > > > > > > Do I need to turn autocommit off on the consumer? Since I'll be doing > > it > > > > manually. > > > > > > > > Sent from my Android > > > > On Jan 5, 2012 8:50 AM, "Jun Rao" <jun...@gmail.com> wrote: > > > > > > > > > Do you want to stop consumption when you hit a deserialization > error > > > and > > > > > fix the bug in the deserializer and reconsume the last message? If > > so, > > > > you > > > > > can explicitly call commitOffset periodically after successful > > > > > deserialization of the messages. If you fail and restart after the > > bug > > > is > > > > > fixed, the last few messages will be replayed. Under the cover, > > > > > commitOffset commit offset of the last consumed message for each > > > > subscribed > > > > > topic. > > > > > > > > > > Jun > > > > > > > > > > > > > > > 2012/1/4 Patricio Echagüe <patric...@gmail.com> > > > > > > > > > > > Hi folks, I was hoping to get some advice on how to design the > > > > following > > > > > > use case. > > > > > > > > > > > > My code (consumer) reads messages from Topic A and per partition > > > (that > > > > at > > > > > > the moment is just 1). The consumer is single threaded per topic. > > > > > > After reading/dequeuing the message, I get an error when trying > to > > > > > > deserialize it (this error is related to the way I serialize my > > > objects > > > > > > using json) making my consumer unable to re-process the message > > > (since > > > > > the > > > > > > message was already consumed). It is not a Kafka-related issue > but > > > made > > > > > me > > > > > > realize the fact that I can lose messages. > > > > > > > > > > > > Ideally I would live to avoid "commiting" to the broker that the > > > > message > > > > > > has been consumed, wait until the message is processed > successfully > > > by > > > > my > > > > > > consumer and once I make sure I properly processed the message, > > then > > > > send > > > > > > the acknowledge to the broker indicating that this message can be > > > > > > discarded. > > > > > > > > > > > > In case of an error, the broker should be able to re-send the > same > > > > > message. > > > > > > > > > > > > What would be the way to achieve this? > > > > > > > > > > > > I see that MessageStream has a method called "commitOffset" but > It > > > > > doesn't > > > > > > seem to apply to a particular topic. > > > > > > > > > > > > Am I approaching the problem in the wrong direction ? > > > > > > > > > > > > Thanks > > > > > > Patricio > > > > > > > > > > > > > > > > > > > > >