hachikuji commented on a change in pull request #10252: URL: https://github.com/apache/kafka/pull/10252#discussion_r587850882
########## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ########## @@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int, val prevPartition = newPartitionMap.put(partition.partitionIndex, partition) if (partition.isReplicaFor(brokerId)) { _localChanged.add(partition) - } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) { - _localRemoved.add(prevPartition) + } else if (prevPartition != null) { + maybeAddToLocalRemoved(prevPartition) } newNameMap.put(partition.topicName, newPartitionMap) } + private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = { + if (partition.isReplicaFor(brokerId)) { + val currentTopicId = newReverseIdMap.get(partition.topicName) Review comment: The intent is to only return the change in `_localRemoved` if the topic existed in the previous image. If we only check topic name, then successive deletions and recreations might leave some partitions in `_localRemoved` that were not in the previous image. It's worth noting that this is strictly more defensive than the current replay logic requires. A new image is built for each batch of records from the controller, and we would never see a topic deleted and recreated (or vice versa) in the same batch. This is an implicit contract though and not protected by the builder API, so I thought we might as well try to make the logic more resilient. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org