One thing to add, is that by doing this you could possibly get duplicates
but not data loss, which obeys Kafka's at-least once semantics.

Guozhang

On Mon, Feb 1, 2016 at 3:17 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Han,
>
> I looked at your test code and actually the error is in this line:
> https://gist.github.com/darkjh/437ac72cdd4b1c4ca2e7#file-kafkabug2-scala-L61
>
> where you call "commitSync" in the finally block, which will commit
> messages that is returned to you from poll() call.
>
>
> More specifically, for example your poll() call returned you a set of
> messages with offset 0 to 100. From the consumer's point of view once they
> are returned to the user they are considered "consumed", and hence if you
> call commitSync after that they will ALL be committed (i.e. consumer will
> commit offset 100). But if you hit an exception / got a close signal while,
> say, processing message with offset 50, then call commitSync in the finally
> block you will effectively lose messages 50 to 100.
>
> Hence as a user of the consumer, one should only call "commit" if she is
> certain that all messages returned from "poll()" have been processed.
>
> Guozhang
>
>
> On Mon, Feb 1, 2016 at 9:59 AM, Han JU <ju.han.fe...@gmail.com> wrote:
>
>> Hi,
>>
>> One of our usage of kafka is to tolerate arbitrary consumer crash without
>> losing or duplicating messages. So in our code we manually commit offset
>> after successfully persisted the consumer state.
>>
>> In prototyping with kafka-0.9's new consumer API, I found that in some
>> cases, kafka failed to send a part of messages to the consumers even if
>> the
>> offsets are handled correctly.
>>
>> I've made sure that this time everything is latest on 0.9.0 branch
>> (d1ff6c7) for both broker and client code.
>>
>> Test code snippet is here:
>>   https://gist.github.com/darkjh/437ac72cdd4b1c4ca2e7
>>
>> Test setup:
>>   - 12 partitions
>>   - 2 consumer app process with 2 consumer thread each
>>   - producer produces exactly 1.2M messages in about 2 minutes (enough
>> time
>> for us to manual kill -9 consumer)
>>   - a consumer thread commits offset on each 80k messages received (to
>> simulate our regularly offset commit)
>>   - after all messages are consumed, each consumer thread will write a
>> number in file indicating how much message it has received. So all numbers
>> should sum to exactly 1.2M if everything goes well
>>
>> Test run:
>>   - run the producer
>>   - run the 2 consumer app process in the same time
>>   - wait for the first commit offset (first 80k messages received in each
>> consumer thread)
>>   - after the first commit offset, kill -9 one of the consumer app
>>   - let another consumer app runs till messages are finished
>>   - check the files written by the remaining consumer threads
>>
>> And after that, by checking the file, we do not receive 1.2M message but
>> roughly at 1.04M. The lag on kafka of this topic is 0.
>> If you check the logs of the consumer app with DEBUG level, you'll find
>> out
>> that the offsets are correctly handled. 30s (default timeout) after the
>> kill -9 of one consumer app, the remaining consumer app correctly gets
>> assigned all the partitions and it starts right from the offsets that the
>> crashed consumer has previously committed. So this makes the message lost
>> quite mysterious for us.
>> Note that the kill -9 moment is important. If we kill -9 one consumer app
>> *before* the first commit offset, everything goes well. All messages
>> received, no lost. But when killed *after* the first commit offset,
>> there'll be messages lost.
>>
>> Hope the code is clear to reproduce the problem. I'm available for any
>> further details needed.
>>
>> Thanks!
>> --
>> *JU Han*
>>
>> Software Engineer @ Teads.tv
>>
>> +33 0619608888
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to