jolshan commented on a change in pull request #9626: URL: https://github.com/apache/kafka/pull/9626#discussion_r544634608
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig, */ if (localLog(topicPartition).isEmpty) markPartitionOffline(topicPartition) + else { + val id = topicIds.get(topicPartition.topic()) + // Ensure we have not received a request from an older protocol + if (id != null && !id.equals(Uuid.ZERO_UUID)) { + val log = localLog(topicPartition).get + // Check if the topic ID is in memory, if not, it must be new to the broker. + // If the broker previously wrote it to file, it would be recovered on restart after failure. + // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic. + // If the topic ID in the log does not match the one in the request, the broker's topic must be stale. + if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) { + stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" + + s" match the topic Id provided in the request: " + + s"${topicIds.get(topicPartition.topic).toString}.") + } else { + // There is not yet a topic ID stored in the log. + // Write the partition metadata file if it is empty. + if (log.partitionMetadataFile.get.isEmpty()) { + log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) + log.topicId = topicIds.get(topicPartition.topic) + } else { + stateChangeLogger.warn("Partition metadata file already contains content.") Review comment: So I thought through these cases some more and realized that the metadata file will fail to open if formatted incorrectly. So the only case where there could be data written to the file is if the ID is the zero UUID. So I decided to just fail on reading the file if the zero ID is provided. (We will never write zero ID to file.) The rest of this cleaned up pretty nicely. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org