[ https://issues.apache.org/jira/browse/KAFKA-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787593#comment-16787593 ]
ASF GitHub Bot commented on KAFKA-8069: --------------------------------------- hzxa21 commented on pull request #6401: KAFKA-8069: Setting expireTimestamp to None if it is the default value after loading v1 offset records from __consumer_offsets URL: https://github.com/apache/kafka/pull/6401 After the 2.1 release, if the broker hasn't been upgrade to the latest inter-broker protocol version, the committed offsets stored in the __consumer_offset topic will get cleaned up way earlier than it should be when the offsets are loaded back from the __consumer_offset topic in GroupCoordinator, which will happen during leadership transition or after broker bounce. This patch fixes the bug by setting expireTimestamp to None if it is the default value after loading v1 offset records from __consumer_offsets. Details for the bug can be found in https://issues.apache.org/jira/browse/KAFKA-8069 The bug can be reproduced by starting a broker with inter-broker protocol version = 1.0 and a 2.*/1.*/0.11.* consumer. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Committed offsets get cleaned up right after the coordinator loading them > back from __consumer_offsets in broker with old inter-broker protocol version > (< 2.2) > --------------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-8069 > URL: https://issues.apache.org/jira/browse/KAFKA-8069 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.1.0, 2.2.0, 2.1.1, 2.1.2, 2.2.1 > Reporter: Zhanxiang (Patrick) Huang > Assignee: Zhanxiang (Patrick) Huang > Priority: Critical > Fix For: 2.2.0 > > > After the 2.1 release, if the broker hasn't been upgrade to the latest > inter-broker protocol version, > the committed offsets stored in the __consumer_offset topic will get cleaned > up way earlier than it should be when the offsets are loaded back from the > __consumer_offset topic in GroupCoordinator, which will happen during > leadership transition or after broker bounce. > TL;DR > For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp* > field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior > to > [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets]) > for a kafka 2.1 broker, the logic of getting the expired offsets looks like: > {code:java} > def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): > Map[TopicPartition, OffsetAndMetadata] = { > offsets.filter { > case (topicPartition, commitRecordMetadataAndOffset) => > ... && { > commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match { > case None => > // current version with no per partition retention > currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= > offsetRetentionMs > case Some(expireTimestamp) => > // older versions with explicit expire_timestamp field => old expiration > semantics is used > currentTimestamp >= expireTimestamp > } > } > }.... > } > {code} > The expireTimestamp in the on-disk offset record can only be set when storing > the committed offset in the __consumer_offset topic. But the GroupCoordinator > also has keep a in-memory representation for the expireTimestamp (see the > codes above), which can be set in the following two cases: > # Upon the GroupCoordinator receiving OffsetCommitRequest, the > expireTimestamp is set using the following logic: > {code:java} > expireTimestamp = offsetCommitRequest.retentionTime match { > case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None > case retentionTime => Some(currentTimestamp + retentionTime) > } > {code} > In all the latest client versions, the consumer will set out > OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will > always be None in this case. *This means any committed offset set in this > case will always hit the "case None" in the "getExpiredOffsets(...)" when > coordinator is doing the cleanup, which is correct.* > # Upon the GroupCoordinatorReceiving loading the committed offset stored in > the __consumer_offsets topic from disk, the expireTimestamp is set using the > following logic if IBP<2.1: > {code:java} > val expireTimestamp = > value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] > {code} > and the logic to persist the expireTimestamp is: > {code:java} > // OffsetCommitRequest.DEFAULT_TIMESTAMP = -1 > value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, > offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP)) > {code} > Since the in-memory expireTimestamp will always be None in our case as > mentioned in 1), we will always store -1 on-disk. Therefore, when the offset > is loaded from the __consumer_offsets topic, the in-memory expireTimestamp > will always be set to -1. *This means any committed offset set in this case > will always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" > when coordinator is doing the cleanup, which basically indicates we will > always expire the committed offset on the first expiration check (which is > shortly after they are loaded from __consumer_offsets topic)*. > I am able to reproduce this bug on my local box with one broker using 2.*,1.* > and 0.11.* consumer. The consumer will see null committed offset after the > broker is bounced. > This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690] > in the kafka 2.1 release and the fix is very straight-forward, which is > basically set the expireTimestamp to None if it is -1 in the on-disk format. -- This message was sent by Atlassian JIRA (v7.6.3#76005)