jolshan commented on a change in pull request #9626: URL: https://github.com/apache/kafka/pull/9626#discussion_r543663481
########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig, */ if (localLog(topicPartition).isEmpty) markPartitionOffline(topicPartition) + else { + val id = topicIds.get(topicPartition.topic()) + // Ensure we have not received a request from an older protocol + if (id != null && !id.equals(Uuid.ZERO_UUID)) { + val log = localLog(topicPartition).get + // Check if the topic ID is in memory, if not, it must be new to the broker. + // If the broker previously wrote it to file, it would be recovered on restart after failure. + // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic. + // If the topic ID in the log does not match the one in the request, the broker's topic must be stale. + if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) { + stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" + + s" match the topic Id provided in the request: " + + s"${topicIds.get(topicPartition.topic).toString}.") + } else { + // There is not yet a topic ID stored in the log. + // Write the partition metadata file if it is empty. + if (log.partitionMetadataFile.get.isEmpty()) { + log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic)) + log.topicId = topicIds.get(topicPartition.topic) + } else { + stateChangeLogger.warn("Partition metadata file already contains content.") Review comment: I think that if we reach here, we are in an unexpected state. The partitionMetadata file should have loaded in the topic ID if the file already contained content. I left as a warn rather than an exception. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org