hachikuji commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587850882



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, 
partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)

Review comment:
       The intent is to only return the change in `_localRemoved` if the topic 
existed in the previous image. If we only check topic name, then successive 
deletions and recreations might leave some partitions in `_localRemoved` that 
were not in the previous image.
   
   It's worth noting that this is strictly more defensive than the current 
replay logic requires. A new image is built for each batch of records from the 
controller, and we would never see a topic deleted and recreated (or vice 
versa) in the same batch. This is an implicit contract though and not protected 
by the builder API, so I thought we might as well try to make the logic more 
resilient.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to