One thing to add, is that by doing this you could possibly get duplicates but not data loss, which obeys Kafka's at-least once semantics.
Guozhang On Mon, Feb 1, 2016 at 3:17 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Han, > > I looked at your test code and actually the error is in this line: > https://gist.github.com/darkjh/437ac72cdd4b1c4ca2e7#file-kafkabug2-scala-L61 > > where you call "commitSync" in the finally block, which will commit > messages that is returned to you from poll() call. > > > More specifically, for example your poll() call returned you a set of > messages with offset 0 to 100. From the consumer's point of view once they > are returned to the user they are considered "consumed", and hence if you > call commitSync after that they will ALL be committed (i.e. consumer will > commit offset 100). But if you hit an exception / got a close signal while, > say, processing message with offset 50, then call commitSync in the finally > block you will effectively lose messages 50 to 100. > > Hence as a user of the consumer, one should only call "commit" if she is > certain that all messages returned from "poll()" have been processed. > > Guozhang > > > On Mon, Feb 1, 2016 at 9:59 AM, Han JU <ju.han.fe...@gmail.com> wrote: > >> Hi, >> >> One of our usage of kafka is to tolerate arbitrary consumer crash without >> losing or duplicating messages. So in our code we manually commit offset >> after successfully persisted the consumer state. >> >> In prototyping with kafka-0.9's new consumer API, I found that in some >> cases, kafka failed to send a part of messages to the consumers even if >> the >> offsets are handled correctly. >> >> I've made sure that this time everything is latest on 0.9.0 branch >> (d1ff6c7) for both broker and client code. >> >> Test code snippet is here: >> https://gist.github.com/darkjh/437ac72cdd4b1c4ca2e7 >> >> Test setup: >> - 12 partitions >> - 2 consumer app process with 2 consumer thread each >> - producer produces exactly 1.2M messages in about 2 minutes (enough >> time >> for us to manual kill -9 consumer) >> - a consumer thread commits offset on each 80k messages received (to >> simulate our regularly offset commit) >> - after all messages are consumed, each consumer thread will write a >> number in file indicating how much message it has received. So all numbers >> should sum to exactly 1.2M if everything goes well >> >> Test run: >> - run the producer >> - run the 2 consumer app process in the same time >> - wait for the first commit offset (first 80k messages received in each >> consumer thread) >> - after the first commit offset, kill -9 one of the consumer app >> - let another consumer app runs till messages are finished >> - check the files written by the remaining consumer threads >> >> And after that, by checking the file, we do not receive 1.2M message but >> roughly at 1.04M. The lag on kafka of this topic is 0. >> If you check the logs of the consumer app with DEBUG level, you'll find >> out >> that the offsets are correctly handled. 30s (default timeout) after the >> kill -9 of one consumer app, the remaining consumer app correctly gets >> assigned all the partitions and it starts right from the offsets that the >> crashed consumer has previously committed. So this makes the message lost >> quite mysterious for us. >> Note that the kill -9 moment is important. If we kill -9 one consumer app >> *before* the first commit offset, everything goes well. All messages >> received, no lost. But when killed *after* the first commit offset, >> there'll be messages lost. >> >> Hope the code is clear to reproduce the problem. I'm available for any >> further details needed. >> >> Thanks! >> -- >> *JU Han* >> >> Software Engineer @ Teads.tv >> >> +33 0619608888 >> > > > > -- > -- Guozhang > -- -- Guozhang