We are in a situation where we need at least once delivery. We have a
thread that pulls messages off the consumer, puts them in a queue where
they go through a few async steps, and then after the final step, we want
to commit the offset to the messages we have completed. There may be items
we have not completed still being processed, so
consumerConnector.commitOffsets() isn't an option for us.

We are manually committing offsets to Kafka (0.8.2.1) (auto commit is off.)

We have a simple test case that is supposed to verify that we don't lose
any messages if the Kafka server is shut down:

// there are 25 messages, we send a few now and a few after the server
comes back up
for (TestMessageClass mess : messages.subList(0, mid)) {
producer.send(mess);
}

stopKafka(); // in memory KafkaServer
startKafka();

for (TestMessageClass mess : messages.subList(mid, total)) {
producer.send(mess);
}

int tries = 0;
while(testConsumer.received.size() < total && tries++ < 10) {
Thread.sleep(200);
}
assertEquals(keys(testConsumer.received),
keys(ImmutableSet.copyOf(messages)));

The test consumer is very simple:

ConsumerIterator iterator;
while(iterator.hasNext()) {
process(iterator.next());
}

        // end of process:
       commit(messageAndMetadata.offset());

commit is basically the commit code from this page:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka,
but runs the commit in a separate thread so it wont interfere with the
consumer.

Here is the strange thing: If we do not commit, the test passes every time.
Kafka comes back up and the high level consumer picks up right where it
left off. But if we do commit, it does not recover, or we lose messages.
With 1 partition, we only get some prefix of the messages produced before
stopKafka(). With 2, one of the partitions never gets any of the messages
sent in the second half, while the other gets a prefix, but not all of the
messages for that partition.

It seems like the most likely thing is that we are committing the wrong
offsets, but I cannot figure out how that is happening. Does the offset in
MessageAndMetadata not correspond to the offset in OffsetAndMetadata?

Or do we have to abandon the high level consumer entirely if we want to
manually commit in this way?

Reply via email to