mumrah commented on code in PR #15293: URL: https://github.com/apache/kafka/pull/15293#discussion_r1473671017
########## core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala: ########## @@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.L } object ZkMetadataCache { - /** - * Create topic deletions (leader=-2) for topics that are missing in a FULL UpdateMetadataRequest coming from a - * KRaft controller during a ZK migration. This will modify the UpdateMetadataRequest object passed into this method. - */ - def maybeInjectDeletedPartitionsFromFullMetadataRequest( + def transformKRaftControllerFullMetadataRequest( currentMetadata: MetadataSnapshot, requestControllerEpoch: Int, requestTopicStates: util.List[UpdateMetadataTopicState], - ): Seq[Uuid] = { - val prevTopicIds = currentMetadata.topicIds.values.toSet - val requestTopics = requestTopicStates.asScala.map { topicState => - topicState.topicName() -> topicState.topicId() - }.toMap - - val deleteTopics = prevTopicIds -- requestTopics.values.toSet - if (deleteTopics.isEmpty) { - return Seq.empty + ): (util.List[UpdateMetadataTopicState], util.List[String]) = { + val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]() + requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), state)) + val logMessages = new util.ArrayList[String] + val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]() + currentMetadata.topicNames.forKeyValue((id, name) => { + Option(topicIdToNewState.get(id)) match { + case None => + currentMetadata.partitionStates.get(name) match { + case None => logMessages.add(s"Error: topic ${name} appeared in currentMetadata.topicNames, " + + "but not in currentMetadata.partitionStates.") + case Some(oldPartitionStates) => + logMessages.add(s"Removing topic ${name} with ID ${id} from the metadata cache since " + + "the full UMR did not include it.") + newRequestTopicStates.add(createDeletionEntries(name, + id, + oldPartitionStates.values, + requestControllerEpoch)) + } + case Some(newTopicState) => + val indexToState = new util.HashMap[Integer, UpdateMetadataPartitionState] + newTopicState.partitionStates().forEach(part => indexToState.put(part.partitionIndex, part)) + currentMetadata.partitionStates.get(name) match { + case None => logMessages.add(s"Error: topic ${name} appeared in currentMetadata.topicNames, " + + "but not in currentMetadata.partitionStates.") + case Some(oldPartitionStates) => + oldPartitionStates.foreach(state => indexToState.remove(state._1.toInt)) + if (!indexToState.isEmpty) { + logMessages.add(s"Removing ${indexToState.size()} partition(s) from topic ${name} with " + + s"ID ${id} from the metadata cache since the full UMR did not include them.") + newRequestTopicStates.add(createDeletionEntries(name, + id, + indexToState.values().asScala, + requestControllerEpoch)) + } + } + } + }) + if (newRequestTopicStates.isEmpty) { + // If the output is the same as the input, optimize by just returning the input. + (requestTopicStates, logMessages) + } else { + // If the output has some new entries, they should all appear at the beginning. This will + // ensure that the old stuff is cleared out before the new stuff is added. We will need a + // new list for this, of course. + newRequestTopicStates.addAll(requestTopicStates) + (newRequestTopicStates, logMessages) } + } - deleteTopics.foreach { deletedTopicId => - val topicName = currentMetadata.topicNames(deletedTopicId) Review Comment: I believe this was the source of the NoSuchElementException reported in the JIRA, is that right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org