Zhanxiang (Patrick) Huang created KAFKA-8069:
------------------------------------------------

             Summary: Committed offsets get cleaned up right after the 
coordinator loading them back from __consumer_offsets in broker with old 
inter-broker protocol version (< 2.2)
                 Key: KAFKA-8069
                 URL: https://issues.apache.org/jira/browse/KAFKA-8069
             Project: Kafka
          Issue Type: Bug
            Reporter: Zhanxiang (Patrick) Huang
            Assignee: Zhanxiang (Patrick) Huang


After the 2.1 release, if the broker hasn't been upgrade to the latest 
inter-broker protocol version, 
the committed offsets stored in the __consumer_offset topic will get cleaned up 
way earlier than it should be when the offsets are loaded back from the 
__consumer_offset topic in GroupCoordinator, which will happen during 
leadership transition or after broker bounce.

TL;DR
For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp* 
field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior to 
[KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets])
 for a kafka 2.1 broker, the logic of getting the expired offsets looks like:
{code:java}
def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): 
Map[TopicPartition, OffsetAndMetadata] = {
 offsets.filter {
 case (topicPartition, commitRecordMetadataAndOffset) =>
 ... && {
 commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
 case None =>
 // current version with no per partition retention
 currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= 
offsetRetentionMs
 case Some(expireTimestamp) =>
 // older versions with explicit expire_timestamp field => old expiration 
semantics is used
 currentTimestamp >= expireTimestamp
 }
 }
 }....
 }
{code}
The expireTimestamp in the on-disk offset record can only be set when storing 
the committed offset in the __consumer_offset topic. But the GroupCoordinator 
also has keep a in-memory representation for the expireTimestamp (see the codes 
above), which can be set in the following two cases:
 # Upon the GroupCoordinator receiving OffsetCommitRequest, the expireTimestamp 
is set using the following logic:
{code:java}
expireTimestamp = offsetCommitRequest.retentionTime match {
 case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
 case retentionTime => Some(currentTimestamp + retentionTime)
}
{code}
In all the latest client versions, the consumer will set out 
OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will 
always be None in this case. *This means any committed offset set in this case 
will always hit the "case None" in the "getExpiredOffsets(...)" when 
coordinator is doing the cleanup, which is correct.*

 # Upon the GroupCoordinatorReceiving loading the committed offset stored in 
the __consumer_offsets topic from disk, the expireTimestamp is set using the 
following logic if IBP<2.1:
{code:java}
val expireTimestamp = 
value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
{code}
and the logic to persist the expireTimestamp is:
{code:java}
// OffsetCommitRequest.DEFAULT_TIMESTAMP = -1
value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
{code}
Since the in-memory expireTimestamp will always be None in our case as 
mentioned in 1), we will always store -1 on-disk. Therefore, when the offset is 
loaded from the __consumer_offsets topic, the in-memory expireTimestamp will 
always be set to -1. *This means any committed offset set in this case will 
always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" when 
coordinator is doing the cleanup, which basically indicates we will always 
expire the committed offset on the first expiration check (which is shortly 
after they are loaded from __consumer_offsets topic)*.

I am able to reproduce this bug on my local box with one broker using 2.*,1.* 
and 0.11.* consumer. The consumer will see null committed offset after the 
broker is bounced.

This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690] 
in the kafka 2.1 release and the fix is very straight-forward, which is 
basically set the expireTimestamp to None if it is -1 in the on-disk format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to