We have a Kafka broker we use for testing that we have recently updated from 0.9.0.1 to 0.11.0.0 and our java consumer is built using the 0.11.0.0 client. The consumers manually commit offsets and are consuming messages as expected since the upgrade. If we restart the consumers they fetch the previously committed offset from the broker and restart processing new messages as expected. Kafka Manager reports the offsets we expect to see. However, if we restart the broker the consumer receives an old offset from the broker and we can end up re-processing several days' worth of messages.
We have identified the __consumers_offset partition where the offsets are being stored and if we use the console consumer to consume from that partition we see a new message appear each time our consumer commits its offsets. The commands we use are: echo "exclude.internal.topics=false" > /tmp/consumer.config /opt/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server localhost:9092 --topic __consumer_offsets --consumer-property group.id=test-offsets-consumer-group --partition 43 And the output shows our consumer group and topic partition for each commit the consumer sends, the reported offset is correct. [ta-eng-cob1-tstat-events,ta-eng-cob1-ayla,0]::[OffsetMetadata[1833602,NO_METADATA],CommitTime 1507632714328,ExpirationTime 1507719114328] We also used the following command to check that these commits also trigger a new record to be written to the latest __consumer_offset_43 partition log file and we see a new record added to the partition log file every time the consumer commits offsets. /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /var/lib/kafka/__consumer_offsets-43/00000000000003780553.log baseOffset: 4006387 lastOffset: 4006387 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 16 isTransactional: false position: 30359857 CreateTime: 1507632866696 isvalid: true size: 147 magic: 2 compresscodec: NONE crc: 1175188994 Everything appears to be working as expected until we restart the broker which then returns an old offset to the consumer. For example, in the consumer debug output we see the last commit before the broker restart is 1828033 2017-10-09 11:55:16,056 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events committed offset 1828033 for partition ta-eng-cob1-ayla-0 After we restart the broker we see the consumer receive an old offset of 1791273 from the broker. 2017-10-09 11:57:22,735 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher: Resetting offset for partition ta-eng-cob1-ayla-0 to the committed offset 1791273 If we just restart the consumer the fetch returns the correct offset information from the broker. 2017-10-09 11:52:25,984 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events committed offset 1828015 for partition ta-eng-cob1-ayla-0 2017-10-09 11:53:21,658 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher: Resetting offset for partition ta-eng-cob1-ayla-0 to the committed offset 1828015 There don't appear to be any errors in the broker logs to indicate a problem, so the question is what is making the broker return the incorrect offset when it is restarted? Thanks, Phil Luckhurst