[ https://issues.apache.org/jira/browse/KAFKA-19274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Jacot updated KAFKA-19274: -------------------------------- Description: Group Coordinator Shards are not unloaded when __consumer_offsets topic is deleted. The unloading is scheduled but it is ignored because the epoch is equal to the current epoch: {noformat} [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of metadata for __consumer_offsets-0 with epoch OptionalInt[0] (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0] (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat} The issue seems to be in this code: {code:java} // Handle the case where the topic was deleted Option(delta.topicsDelta()).foreach { topicsDelta => if (topicsDelta.topicWasDeleted(topicName)) { topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry => if (entry.getValue.leader == brokerId) { resignation(entry.getKey, Some(entry.getValue.leaderEpoch)) } } } } // Handle the case where the replica was reassigned, made a leader or made a follower getTopicDelta(topicName, image, delta).foreach { topicDelta => val changes = topicDelta.localChanges(brokerId) changes.deletes.forEach { topicPartition => resignation(topicPartition.partition, None) } changes.electedLeaders.forEach { (topicPartition, partitionInfo) => election(topicPartition.partition, partitionInfo.partition.leaderEpoch) } changes.followers.forEach { (topicPartition, partitionInfo) => resignation(topicPartition.partition, Some(partitionInfo.partition.leaderEpoch)) } } {code} We use `None` when a partition is deleted and the actual leader epoch when the topic is deleted. The issue is that the leader epoch is not incremented when the topic is deleted so the unloading logic does not accept the resignation. We should use `None` in both cases. was: Group Coordinator Shards are not unloaded when __consumer_offsets topic is deleted. The unloading is scheduled but it is ignored because the epoch is equal to the current epoch: {noformat} [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of metadata for __consumer_offsets-0 with epoch OptionalInt[0] (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading of metadata for __consumer_offsets-1 with epoch OptionalInt[0] (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat} > Group Coordinator Shards are not unloaded when __consumer_offsets topic is > deleted > ---------------------------------------------------------------------------------- > > Key: KAFKA-19274 > URL: https://issues.apache.org/jira/browse/KAFKA-19274 > Project: Kafka > Issue Type: Bug > Affects Versions: 4.0.0 > Reporter: David Jacot > Assignee: David Jacot > Priority: Major > > Group Coordinator Shards are not unloaded when __consumer_offsets topic is > deleted. The unloading is scheduled but it is ignored because the epoch is > equal to the current epoch: > {noformat} > [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading > of metadata for __consumer_offsets-0 with epoch OptionalInt[0] > (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) > [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Scheduling unloading > of metadata for __consumer_offsets-1 with epoch OptionalInt[0] > (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) > [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading > metadata for __consumer_offsets-0 in epoch OptionalInt[0] since current epoch > is 0. (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime) > [2025-05-13 08:46:00,883] INFO [GroupCoordinator id=1] Ignored unloading > metadata for __consumer_offsets-1 in epoch OptionalInt[0] since current epoch > is 0. > (org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime){noformat} > The issue seems to be in this code: > {code:java} > // Handle the case where the topic was deleted > Option(delta.topicsDelta()).foreach { topicsDelta => > if (topicsDelta.topicWasDeleted(topicName)) { > topicsDelta.image.getTopic(topicName).partitions.entrySet.forEach { entry > => > if (entry.getValue.leader == brokerId) { > resignation(entry.getKey, Some(entry.getValue.leaderEpoch)) > } > } > } > } > // Handle the case where the replica was reassigned, made a leader or made a > follower > getTopicDelta(topicName, image, delta).foreach { topicDelta => > val changes = topicDelta.localChanges(brokerId) > changes.deletes.forEach { topicPartition => > resignation(topicPartition.partition, None) > } > changes.electedLeaders.forEach { (topicPartition, partitionInfo) => > election(topicPartition.partition, partitionInfo.partition.leaderEpoch) > } > changes.followers.forEach { (topicPartition, partitionInfo) => > resignation(topicPartition.partition, > Some(partitionInfo.partition.leaderEpoch)) > } > } {code} > We use `None` when a partition is deleted and the actual leader epoch when > the topic is deleted. The issue is that the leader epoch is not incremented > when the topic is deleted so the unloading logic does not accept the > resignation. We should use `None` in both cases. > -- This message was sent by Atlassian Jira (v8.20.10#820010)