jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544634608



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new 
to the broker.
+                // If the broker previously wrote it to file, it would be 
recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID 
is being used for the given topic.
+                // If the topic ID in the log does not match the one in the 
request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && 
!log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: 
${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    
log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already 
contains content.")

Review comment:
       So I thought through these cases some more and realized that the 
metadata file will fail to open if formatted incorrectly. So the only case 
where there could be data written to the file is if the ID is the zero UUID. So 
I decided to just fail on reading the file if the zero ID is provided. (We will 
never write zero ID to file.) The rest of this cleaned up pretty nicely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to