mumrah commented on code in PR #15293:
URL: https://github.com/apache/kafka/pull/15293#discussion_r1474508267
##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates:
mutable.AnyRefMap[String, mutable.L
}
object ZkMetadataCache {
- /**
- * Create topic deletions (leader=-2) for topics that are missing in a FULL
UpdateMetadataRequest coming from a
- * KRaft controller during a ZK migration. This will modify the
UpdateMetadataRequest object passed into this method.
- */
- def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ def transformKRaftControllerFullMetadataRequest(
currentMetadata: MetadataSnapshot,
requestControllerEpoch: Int,
requestTopicStates: util.List[UpdateMetadataTopicState],
- ): Seq[Uuid] = {
- val prevTopicIds = currentMetadata.topicIds.values.toSet
- val requestTopics = requestTopicStates.asScala.map { topicState =>
- topicState.topicName() -> topicState.topicId()
- }.toMap
-
- val deleteTopics = prevTopicIds -- requestTopics.values.toSet
- if (deleteTopics.isEmpty) {
- return Seq.empty
+ ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
+ val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
+ requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(),
state))
+ val logMessages = new util.ArrayList[String]
+ val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
+ currentMetadata.topicNames.forKeyValue((id, name) => {
Review Comment:
Do we want a try/catch block inside the loop here? One of the failure cases
we saw was just a regular NSE bug causing the whole request to fail.
##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates:
mutable.AnyRefMap[String, mutable.L
}
object ZkMetadataCache {
- /**
- * Create topic deletions (leader=-2) for topics that are missing in a FULL
UpdateMetadataRequest coming from a
- * KRaft controller during a ZK migration. This will modify the
UpdateMetadataRequest object passed into this method.
- */
- def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+ def transformKRaftControllerFullMetadataRequest(
currentMetadata: MetadataSnapshot,
requestControllerEpoch: Int,
requestTopicStates: util.List[UpdateMetadataTopicState],
- ): Seq[Uuid] = {
- val prevTopicIds = currentMetadata.topicIds.values.toSet
- val requestTopics = requestTopicStates.asScala.map { topicState =>
- topicState.topicName() -> topicState.topicId()
- }.toMap
-
- val deleteTopics = prevTopicIds -- requestTopics.values.toSet
- if (deleteTopics.isEmpty) {
- return Seq.empty
+ ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
+ val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
+ requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(),
state))
+ val logMessages = new util.ArrayList[String]
+ val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
+ currentMetadata.topicNames.forKeyValue((id, name) => {
+ Option(topicIdToNewState.get(id)) match {
+ case None =>
+ currentMetadata.partitionStates.get(name) match {
+ case None => logMessages.add(s"Error: topic ${name} appeared in
currentMetadata.topicNames, " +
+ "but not in currentMetadata.partitionStates.")
+ case Some(oldPartitionStates) =>
Review Comment:
Some of the terminology in here is confusing, for example oldPartitionStates
= currentMetadata.partitionStates.get.
WDYT about rename things to "request" and "local" (or "inMemory") ?
##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -516,9 +587,10 @@ class ZkMetadataCache(
if (traceEnabled)
stateChangeLogger.trace(s"Deleted partition $tp from metadata
cache in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch
$controllerEpoch with correlation id $correlationId")
- deletedPartitions += tp
+ deletedPartitions.add(tp)
} else {
addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition,
state)
+ deletedPartitions.remove(tp)
if (traceEnabled)
Review Comment:
Is this needed this to deal with the fact that a UMR from KRaft can include
deletions and re-creations?
##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -429,26 +465,61 @@ class ZkMetadataCache(
controllerId(snapshot).orNull)
}
- // This method returns the deleted TopicPartitions received from
UpdateMetadataRequest
- def updateMetadata(correlationId: Int, updateMetadataRequest:
UpdateMetadataRequest): Seq[TopicPartition] = {
+ // This method returns the deleted TopicPartitions received from
UpdateMetadataRequest.
+ // Note: if this ZK broker is migrating to KRaft, a singular UMR may
sometimes both delete a
+ // partition and re-create a new partition with that same name. In that
case, it will not appear
+ // in the return value of this function.
+ def updateMetadata(
+ correlationId: Int,
+ originalUpdateMetadataRequest: UpdateMetadataRequest
+ ): Seq[TopicPartition] = {
+ var updateMetadataRequest = originalUpdateMetadataRequest
inWriteLock(partitionMetadataLock) {
if (
updateMetadataRequest.isKRaftController &&
updateMetadataRequest.updateType() == AbstractControlRequest.Type.FULL
) {
- if (!zkMigrationEnabled) {
+ if (updateMetadataRequest.version() < 8) {
+ stateChangeLogger.error(s"Received UpdateMetadataRequest with
Type=FULL (2), but version of " +
+ updateMetadataRequest.version() + ", which should not be possible.
Not treating this as a full " +
+ "metadata update")
+ } else if (!zkMigrationEnabled) {
stateChangeLogger.error(s"Received UpdateMetadataRequest with
Type=FULL (2), but ZK migrations " +
s"are not enabled on this broker. Not treating this as a full
metadata update")
} else {
- val deletedTopicIds =
ZkMetadataCache.maybeInjectDeletedPartitionsFromFullMetadataRequest(
- metadataSnapshot, updateMetadataRequest.controllerEpoch(),
updateMetadataRequest.topicStates())
- if (deletedTopicIds.isEmpty) {
- stateChangeLogger.trace(s"Received UpdateMetadataRequest with
Type=FULL (2), " +
- s"but no deleted topics were detected.")
- } else {
- stateChangeLogger.debug(s"Received UpdateMetadataRequest with
Type=FULL (2), " +
- s"found ${deletedTopicIds.size} deleted topic ID(s):
$deletedTopicIds.")
- }
+ // When handling a UMR from a KRaft controller, we may have to
insert some partition
+ // deletions at the beginning, to handle the different way topic
deletion works in KRaft
+ // mode (and also migration mode).
+ //
+ // After we've done that, we re-create the whole
UpdateMetadataRequest object using the
+ // updated list of topic info. This ensures that
UpdateMetadataRequest.normalize is called
+ // on the new, updated topic data. Note that we don't mutate the old
request object; it may
+ // be used elsewhere.
+ val (newTopicStates, logs) =
ZkMetadataCache.transformKRaftControllerFullMetadataRequest(
+ metadataSnapshot,
+ updateMetadataRequest.controllerEpoch(),
+ updateMetadataRequest.topicStates())
+ val oldData = updateMetadataRequest.data()
+
+ // It would be nice if we could call duplicate() here, but we don't
want to copy the
+ // old topicStates array. That would be quite costly, and we're not
going to use it anyway.
+ // Instead, we copy each field that we need.
+ val newData = new UpdateMetadataRequestData().
+ setControllerId(oldData.controllerId()).
+ setIsKRaftController(oldData.isKRaftController).
+ setType(oldData.`type`()).
+ setControllerEpoch(oldData.controllerEpoch()).
+ setBrokerEpoch(oldData.brokerEpoch()).
+ setTopicStates(newTopicStates).
+ setLiveBrokers(oldData.liveBrokers())
+ updateMetadataRequest = new UpdateMetadataRequest(newData,
updateMetadataRequest.version())
+ logs.forEach(log => {
+ if (log.startsWith("Error")) {
+ stateChangeLogger.error(log)
+ } else {
+ stateChangeLogger.info(log)
+ }
+ })
Review Comment:
Why do we want to defer the logging here? Is it just for unit testing, or
something else as well?
If we want to test the log messages, maybe we can pass a string consumer
into transformKRaftControllerFullMetadataRequest. That way, if we encounter a
problem during that method call, we don't lose all the log messages.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]