[ 
https://issues.apache.org/jira/browse/KAFKA-19274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-19274:
--------------------------------
    Description: 
Group Coordinator Shards are not unloaded when __consumer_offsets topic is 
deleted. The unloading is scheduled but it is ignored because the epoch is 
equal to the current epoch:
{noformat}
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of 
metadata for __consumer_offsets-0 with epoch OptionalInt[0] 
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of 
metadata for __consumer_offsets-1 with epoch OptionalInt[0] 
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading 
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch 
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading 
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch 
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat}
The issue seems to be in this code:
{code:java}
// Handle the case where the topic was deleted
Option(delta.topicsDelta()).foreach { topicsDelta =>
  if (topicsDelta.topicWasDeleted(topicName)) {
    topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry =>
      if (entry.getValue.leader == brokerId) {
        resignation(entry.getKey, Some(entry.getValue.leaderEpoch))
      }
    }
  }
}

// Handle the case where the replica was reassigned, made a leader or made a 
follower
getTopicDelta(topicName, image, delta).foreach { topicDelta =>
  val changes = topicDelta.localChanges(brokerId)

  changes.deletes.forEach { topicPartition =>
    resignation(topicPartition.partition, None)
  }
  changes.electedLeaders.forEach { (topicPartition, partitionInfo) =>
    election(topicPartition.partition, partitionInfo.partition.leaderEpoch)
  }
  changes.followers.forEach { (topicPartition, partitionInfo) =>
    resignation(topicPartition.partition, 
Some(partitionInfo.partition.leaderEpoch))
  }
} {code}
We use `None` when a partition is deleted and the actual leader epoch when the 
topic is deleted. The issue is that the leader epoch is not incremented when 
the topic is deleted so the unloading logic does not accept the resignation. We 
should use `None` in both cases.

 

  was:
Group Coordinator Shards are not unloaded when __consumer_offsets topic is 
deleted. The unloading is scheduled but it is ignored because the epoch is 
equal to the current epoch:
{noformat}
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of 
metadata for __consumer_offsets-0 with epoch OptionalInt[0] 
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of 
metadata for __consumer_offsets-1 with epoch OptionalInt[0] 
(org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading 
metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch 
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
[2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading 
metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch 
is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat}
 

 


> Group Coordinator Shards are not unloaded when __consumer_offsets topic is 
> deleted
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-19274
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19274
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 4.0.0
>            Reporter: David Jacot
>            Assignee: David Jacot
>            Priority: Major
>
> Group Coordinator Shards are not unloaded when __consumer_offsets topic is 
> deleted. The unloading is scheduled but it is ignored because the epoch is 
> equal to the current epoch:
> {noformat}
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading 
> of metadata for __consumer_offsets-0 with epoch OptionalInt[0] 
> (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading 
> of metadata for __consumer_offsets-1 with epoch OptionalInt[0] 
> (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading 
> metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch 
> is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime)
> [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading 
> metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch 
> is 0. 
> (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat}
> The issue seems to be in this code:
> {code:java}
> // Handle the case where the topic was deleted
> Option(delta.topicsDelta()).foreach { topicsDelta =>
>   if (topicsDelta.topicWasDeleted(topicName)) {
>     topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry 
> =>
>       if (entry.getValue.leader == brokerId) {
>         resignation(entry.getKey, Some(entry.getValue.leaderEpoch))
>       }
>     }
>   }
> }
> // Handle the case where the replica was reassigned, made a leader or made a 
> follower
> getTopicDelta(topicName, image, delta).foreach { topicDelta =>
>   val changes = topicDelta.localChanges(brokerId)
>   changes.deletes.forEach { topicPartition =>
>     resignation(topicPartition.partition, None)
>   }
>   changes.electedLeaders.forEach { (topicPartition, partitionInfo) =>
>     election(topicPartition.partition, partitionInfo.partition.leaderEpoch)
>   }
>   changes.followers.forEach { (topicPartition, partitionInfo) =>
>     resignation(topicPartition.partition, 
> Some(partitionInfo.partition.leaderEpoch))
>   }
> } {code}
> We use `None` when a partition is deleted and the actual leader epoch when 
> the topic is deleted. The issue is that the leader epoch is not incremented 
> when the topic is deleted so the unloading logic does not accept the 
> resignation. We should use `None` in both cases.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to