[ 
https://issues.apache.org/jira/browse/KAFKA-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16787609#comment-16787609
 ] 

Jason Gustafson commented on KAFKA-8069:
----------------------------------------

Great find and thanks for the detailed summary. I was able to confirm this on 
trunk. The only requirement is a 2.1+ broker with IBP set to an old version. I 
did the following:
 # Start up the broker with IBP set to 2.0
 # Start up a console consumer (on trunk) and verify it commits offsets
 # Stop the consumer
 # Restart the broker

We see the following in the logs:
{code:java}
[2019-03-07 22:49:25,774] DEBUG [GroupMetadataManager brokerId=0] Loaded group 
metadata GroupMetadata(groupId=blah, generation=4, protocolType=Some(consumer), 
currentState=Empty, members=Map()) with offsets Map(foo-0 -> 
CommitRecordMetadataAndOffset(Some(12),OffsetAndMetadata(offset=1, 
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1552027741320, 
expireTimestamp=Some(-1)))) and pending offsets Map() 
(kafka.coordinator.group.GroupMetadataManager)                                  
          {code}
The key is `*expireTimestamp=Some(-1)*`. Shortly after I see the coordinator 
expiring the offset.

> Committed offsets get cleaned up right after the coordinator loading them 
> back from __consumer_offsets in broker with old inter-broker protocol version 
> (< 2.2)
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8069
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8069
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.1.0, 2.2.0, 2.1.1, 2.1.2, 2.2.1
>            Reporter: Zhanxiang (Patrick) Huang
>            Assignee: Zhanxiang (Patrick) Huang
>            Priority: Critical
>             Fix For: 2.2.0
>
>
> After the 2.1 release, if the broker hasn't been upgrade to the latest 
> inter-broker protocol version, 
> the committed offsets stored in the __consumer_offset topic will get cleaned 
> up way earlier than it should be when the offsets are loaded back from the 
> __consumer_offset topic in GroupCoordinator, which will happen during 
> leadership transition or after broker bounce.
> TL;DR
> For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp* 
> field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior 
> to 
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets])
>  for a kafka 2.1 broker, the logic of getting the expired offsets looks like:
> {code:java}
> def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): 
> Map[TopicPartition, OffsetAndMetadata] = {
>  offsets.filter {
>  case (topicPartition, commitRecordMetadataAndOffset) =>
>  ... && {
>  commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
>  case None =>
>  // current version with no per partition retention
>  currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= 
> offsetRetentionMs
>  case Some(expireTimestamp) =>
>  // older versions with explicit expire_timestamp field => old expiration 
> semantics is used
>  currentTimestamp >= expireTimestamp
>  }
>  }
>  }....
>  }
> {code}
> The expireTimestamp in the on-disk offset record can only be set when storing 
> the committed offset in the __consumer_offset topic. But the GroupCoordinator 
> also has keep a in-memory representation for the expireTimestamp (see the 
> codes above), which can be set in the following two cases:
>  # Upon the GroupCoordinator receiving OffsetCommitRequest, the 
> expireTimestamp is set using the following logic:
> {code:java}
> expireTimestamp = offsetCommitRequest.retentionTime match {
>  case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
>  case retentionTime => Some(currentTimestamp + retentionTime)
> }
> {code}
> In all the latest client versions, the consumer will set out 
> OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will 
> always be None in this case. *This means any committed offset set in this 
> case will always hit the "case None" in the "getExpiredOffsets(...)" when 
> coordinator is doing the cleanup, which is correct.*
>  # Upon the GroupCoordinatorReceiving loading the committed offset stored in 
> the __consumer_offsets topic from disk, the expireTimestamp is set using the 
> following logic if IBP<2.1:
> {code:java}
> val expireTimestamp = 
> value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
> {code}
> and the logic to persist the expireTimestamp is:
> {code:java}
> // OffsetCommitRequest.DEFAULT_TIMESTAMP = -1
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, 
> offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
> {code}
> Since the in-memory expireTimestamp will always be None in our case as 
> mentioned in 1), we will always store -1 on-disk. Therefore, when the offset 
> is loaded from the __consumer_offsets topic, the in-memory expireTimestamp 
> will always be set to -1. *This means any committed offset set in this case 
> will always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)" 
> when coordinator is doing the cleanup, which basically indicates we will 
> always expire the committed offset on the first expiration check (which is 
> shortly after they are loaded from __consumer_offsets topic)*.
> I am able to reproduce this bug on my local box with one broker using 2.*,1.* 
> and 0.11.* consumer. The consumer will see null committed offset after the 
> broker is bounced.
> This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690] 
> in the kafka 2.1 release and the fix is very straight-forward, which is 
> basically set the expireTimestamp to None if it is -1 in the on-disk format.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to