mumrah commented on code in PR #15293:
URL: https://github.com/apache/kafka/pull/15293#discussion_r1474508267


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates: 
mutable.AnyRefMap[String, mutable.L
 }
 
 object ZkMetadataCache {
-  /**
-   * Create topic deletions (leader=-2) for topics that are missing in a FULL 
UpdateMetadataRequest coming from a
-   * KRaft controller during a ZK migration. This will modify the 
UpdateMetadataRequest object passed into this method.
-   */
-  def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+  def transformKRaftControllerFullMetadataRequest(
     currentMetadata: MetadataSnapshot,
     requestControllerEpoch: Int,
     requestTopicStates: util.List[UpdateMetadataTopicState],
-  ): Seq[Uuid] = {
-    val prevTopicIds = currentMetadata.topicIds.values.toSet
-    val requestTopics = requestTopicStates.asScala.map { topicState =>
-      topicState.topicName() -> topicState.topicId()
-    }.toMap
-
-    val deleteTopics = prevTopicIds -- requestTopics.values.toSet
-    if (deleteTopics.isEmpty) {
-      return Seq.empty
+  ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
+    val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
+    requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), 
state))
+    val logMessages = new util.ArrayList[String]
+    val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
+    currentMetadata.topicNames.forKeyValue((id, name) => {

Review Comment:
   Do we want a try/catch block inside the loop here? One of the failure cases 
we saw was just a regular NSE bug causing the whole request to fail. 



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates: 
mutable.AnyRefMap[String, mutable.L
 }
 
 object ZkMetadataCache {
-  /**
-   * Create topic deletions (leader=-2) for topics that are missing in a FULL 
UpdateMetadataRequest coming from a
-   * KRaft controller during a ZK migration. This will modify the 
UpdateMetadataRequest object passed into this method.
-   */
-  def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+  def transformKRaftControllerFullMetadataRequest(
     currentMetadata: MetadataSnapshot,
     requestControllerEpoch: Int,
     requestTopicStates: util.List[UpdateMetadataTopicState],
-  ): Seq[Uuid] = {
-    val prevTopicIds = currentMetadata.topicIds.values.toSet
-    val requestTopics = requestTopicStates.asScala.map { topicState =>
-      topicState.topicName() -> topicState.topicId()
-    }.toMap
-
-    val deleteTopics = prevTopicIds -- requestTopics.values.toSet
-    if (deleteTopics.isEmpty) {
-      return Seq.empty
+  ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
+    val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
+    requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), 
state))
+    val logMessages = new util.ArrayList[String]
+    val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
+    currentMetadata.topicNames.forKeyValue((id, name) => {
+      Option(topicIdToNewState.get(id)) match {
+        case None =>
+          currentMetadata.partitionStates.get(name) match {
+            case None => logMessages.add(s"Error: topic ${name} appeared in 
currentMetadata.topicNames, " +
+              "but not in currentMetadata.partitionStates.")
+            case Some(oldPartitionStates) =>

Review Comment:
   Some of the terminology in here is confusing, for example oldPartitionStates 
= currentMetadata.partitionStates.get.
   
   WDYT about rename things to "request" and "local" (or "inMemory") ?



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -516,9 +587,10 @@ class ZkMetadataCache(
             if (traceEnabled)
               stateChangeLogger.trace(s"Deleted partition $tp from metadata 
cache in response to UpdateMetadata " +
                 s"request sent by controller $controllerId epoch 
$controllerEpoch with correlation id $correlationId")
-            deletedPartitions += tp
+            deletedPartitions.add(tp)
           } else {
             addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, 
state)
+            deletedPartitions.remove(tp)
             if (traceEnabled)

Review Comment:
   Is this needed this to deal with the fact that a UMR from KRaft can include 
deletions and re-creations?



##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -429,26 +465,61 @@ class ZkMetadataCache(
       controllerId(snapshot).orNull)
   }
 
-  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
-  def updateMetadata(correlationId: Int, updateMetadataRequest: 
UpdateMetadataRequest): Seq[TopicPartition] = {
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest.
+  // Note: if this ZK broker is migrating to KRaft, a singular UMR may 
sometimes both delete a
+  // partition and re-create a new partition with that same name. In that 
case, it will not appear
+  // in the return value of this function.
+  def updateMetadata(
+    correlationId: Int,
+    originalUpdateMetadataRequest: UpdateMetadataRequest
+  ): Seq[TopicPartition] = {
+    var updateMetadataRequest = originalUpdateMetadataRequest
     inWriteLock(partitionMetadataLock) {
       if (
         updateMetadataRequest.isKRaftController &&
         updateMetadataRequest.updateType() == AbstractControlRequest.Type.FULL
       ) {
-        if (!zkMigrationEnabled) {
+        if (updateMetadataRequest.version() < 8) {
+          stateChangeLogger.error(s"Received UpdateMetadataRequest with 
Type=FULL (2), but version of " +
+            updateMetadataRequest.version() + ", which should not be possible. 
Not treating this as a full " +
+            "metadata update")
+        } else if (!zkMigrationEnabled) {
           stateChangeLogger.error(s"Received UpdateMetadataRequest with 
Type=FULL (2), but ZK migrations " +
             s"are not enabled on this broker. Not treating this as a full 
metadata update")
         } else {
-          val deletedTopicIds = 
ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
-            metadataSnapshot, updateMetadataRequest.controllerEpoch(), 
updateMetadataRequest.topicStates())
-          if (deletedTopicIds.isEmpty) {
-            stateChangeLogger.trace(s"Received UpdateMetadataRequest with 
Type=FULL (2), " +
-              s"but no deleted topics were detected.")
-          } else {
-            stateChangeLogger.debug(s"Received UpdateMetadataRequest with 
Type=FULL (2), " +
-              s"found ${deletedTopicIds.size} deleted topic ID(s): 
$deletedTopicIds.")
-          }
+          // When handling a UMR from a KRaft controller, we may have to 
insert some partition
+          // deletions at the beginning, to handle the different way topic 
deletion works in KRaft
+          // mode (and also migration mode).
+          //
+          // After we've done that, we re-create the whole 
UpdateMetadataRequest object using the
+          // updated list of topic info. This ensures that 
UpdateMetadataRequest.normalize is called
+          // on the new, updated topic data. Note that we don't mutate the old 
request object; it may
+          // be used elsewhere.
+          val (newTopicStates, logs) = 
ZkMetadataCache.transformKRaftControllerFullMetadataRequest(
+            metadataSnapshot,
+            updateMetadataRequest.controllerEpoch(),
+            updateMetadataRequest.topicStates())
+          val oldData = updateMetadataRequest.data()
+
+          // It would be nice if we could call duplicate() here, but we don't 
want to copy the
+          // old topicStates array. That would be quite costly, and we're not 
going to use it anyway.
+          // Instead, we copy each field that we need.
+          val newData = new UpdateMetadataRequestData().
+            setControllerId(oldData.controllerId()).
+            setIsKRaftController(oldData.isKRaftController).
+            setType(oldData.`type`()).
+            setControllerEpoch(oldData.controllerEpoch()).
+            setBrokerEpoch(oldData.brokerEpoch()).
+            setTopicStates(newTopicStates).
+            setLiveBrokers(oldData.liveBrokers())
+          updateMetadataRequest = new UpdateMetadataRequest(newData, 
updateMetadataRequest.version())
+          logs.forEach(log => {
+            if (log.startsWith("Error")) {
+              stateChangeLogger.error(log)
+            } else {
+              stateChangeLogger.info(log)
+            }
+          })

Review Comment:
   Why do we want to defer the logging here? Is it just for unit testing, or 
something else as well?
   
   If we want to test the log messages, maybe we can pass a string consumer 
into transformKRaftControllerFullMetadataRequest. That way, if we encounter a 
problem during that method call, we don't lose all the log messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to