We are in a situation where we need at least once delivery. We have a thread that pulls messages off the consumer, puts them in a queue where they go through a few async steps, and then after the final step, we want to commit the offset to the messages we have completed. There may be items we have not completed still being processed, so consumerConnector.commitOffsets() isn't an option for us.
We are manually committing offsets to Kafka (0.8.2.1) (auto commit is off.) We have a simple test case that is supposed to verify that we don't lose any messages if the Kafka server is shut down: // there are 25 messages, we send a few now and a few after the server comes back up for (TestMessageClass mess : messages.subList(0, mid)) { producer.send(mess); } stopKafka(); // in memory KafkaServer startKafka(); for (TestMessageClass mess : messages.subList(mid, total)) { producer.send(mess); } int tries = 0; while(testConsumer.received.size() < total && tries++ < 10) { Thread.sleep(200); } assertEquals(keys(testConsumer.received), keys(ImmutableSet.copyOf(messages))); The test consumer is very simple: ConsumerIterator iterator; while(iterator.hasNext()) { process(iterator.next()); } // end of process: commit(messageAndMetadata.offset()); commit is basically the commit code from this page: https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka, but runs the commit in a separate thread so it wont interfere with the consumer. Here is the strange thing: If we do not commit, the test passes every time. Kafka comes back up and the high level consumer picks up right where it left off. But if we do commit, it does not recover, or we lose messages. With 1 partition, we only get some prefix of the messages produced before stopKafka(). With 2, one of the partitions never gets any of the messages sent in the second half, while the other gets a prefix, but not all of the messages for that partition. It seems like the most likely thing is that we are committing the wrong offsets, but I cannot figure out how that is happening. Does the offset in MessageAndMetadata not correspond to the offset in OffsetAndMetadata? Or do we have to abandon the high level consumer entirely if we want to manually commit in this way?