cmccabe commented on code in PR #15293:
URL: https://github.com/apache/kafka/pull/15293#discussion_r1474870992
##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates:
mutable.AnyRefMap[String, mutable.L
}
object ZkMetadataCache {
- /**
- * Create topic deletions (leader=-2) for topics that are missing in a FULL
UpdateMetadataRequest coming from a
- * KRaft controller during a ZK migration. This will modify the
UpdateMetadataRequest object passed into this method.
- */
- def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ def transformKRaftControllerFullMetadataRequest(
currentMetadata: MetadataSnapshot,
requestControllerEpoch: Int,
requestTopicStates: util.List[UpdateMetadataTopicState],
- ): Seq[Uuid] = {
- val prevTopicIds = currentMetadata.topicIds.values.toSet
- val requestTopics = requestTopicStates.asScala.map { topicState =>
- topicState.topicName() -> topicState.topicId()
- }.toMap
-
- val deleteTopics = prevTopicIds -- requestTopics.values.toSet
- if (deleteTopics.isEmpty) {
- return Seq.empty
+ ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
+ val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
+ requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(),
state))
+ val logMessages = new util.ArrayList[String]
+ val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
+ currentMetadata.topicNames.forKeyValue((id, name) => {
+ Option(topicIdToNewState.get(id)) match {
+ case None =>
+ currentMetadata.partitionStates.get(name) match {
+ case None => logMessages.add(s"Error: topic ${name} appeared in
currentMetadata.topicNames, " +
+ "but not in currentMetadata.partitionStates.")
+ case Some(oldPartitionStates) =>
+ logMessages.add(s"Removing topic ${name} with ID ${id} from the
metadata cache since " +
+ "the full UMR did not include it.")
+ newRequestTopicStates.add(createDeletionEntries(name,
+ id,
+ oldPartitionStates.values,
+ requestControllerEpoch))
+ }
+ case Some(newTopicState) =>
+ val indexToState = new util.HashMap[Integer,
UpdateMetadataPartitionState]
+ newTopicState.partitionStates().forEach(part =>
indexToState.put(part.partitionIndex, part))
+ currentMetadata.partitionStates.get(name) match {
+ case None => logMessages.add(s"Error: topic ${name} appeared in
currentMetadata.topicNames, " +
+ "but not in currentMetadata.partitionStates.")
+ case Some(oldPartitionStates) =>
+ oldPartitionStates.foreach(state =>
indexToState.remove(state._1.toInt))
+ if (!indexToState.isEmpty) {
+ logMessages.add(s"Removing ${indexToState.size()} partition(s)
from topic ${name} with " +
+ s"ID ${id} from the metadata cache since the full UMR did
not include them.")
+ newRequestTopicStates.add(createDeletionEntries(name,
+ id,
+ indexToState.values().asScala,
+ requestControllerEpoch))
+ }
+ }
+ }
+ })
+ if (newRequestTopicStates.isEmpty) {
+ // If the output is the same as the input, optimize by just returning
the input.
+ (requestTopicStates, logMessages)
+ } else {
+ // If the output has some new entries, they should all appear at the
beginning. This will
+ // ensure that the old stuff is cleared out before the new stuff is
added. We will need a
+ // new list for this, of course.
+ newRequestTopicStates.addAll(requestTopicStates)
+ (newRequestTopicStates, logMessages)
}
+ }
- deleteTopics.foreach { deletedTopicId =>
- val topicName = currentMetadata.topicNames(deletedTopicId)
Review Comment:
The unguarded apply was on `currentMetadata.partitionStates`, I believe. And
I think the bad state was because we both deleted and re-created a
topic-partition, but did them out of order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]