hachikuji commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r677015974



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1367,13 +1367,23 @@ class ReplicaManager(val config: KafkaConfig,
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIdFromRequest(topicPartition.topic)
+              val logTopicId = partition.topicId
+
+              // We propagate the partition state down if:
+              // 1. The leader epoch is higher than the current leader epoch 
of the partition
+              // 2. The leader epoch is same as the current leader epoch but a 
new topic id is being assigned. This is
+              //    needed to handle the case where a topic id is assigned for 
the first time after upgrade.
+              def propagatePartitionState(requestLeaderEpoch: Int, 
currentLeaderEpoch: Int): Boolean = {
+                requestLeaderEpoch > currentLeaderEpoch ||
+                  (requestLeaderEpoch == currentLeaderEpoch && 
logTopicId.isEmpty && requestTopicId.isDefined)
+              }
 
-              if (!hasConsistentTopicId(requestTopicId, partition.topicId)) {
-                stateChangeLogger.error(s"Topic ID in memory: 
${partition.topicId.get} does not" +
+              if (!hasConsistentTopicId(requestTopicId, logTopicId)) {
+                stateChangeLogger.error(s"Topic ID in memory: 
${logTopicId.get} does not" +
                   s" match the topic ID for partition $topicPartition 
received: " +
                   s"${requestTopicId.get}.")
                 responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
-              } else if (requestLeaderEpoch > currentLeaderEpoch) {
+              } else if (propagatePartitionState(requestLeaderEpoch, 
currentLeaderEpoch)) {

Review comment:
       Nevertheless, the path through `makeLeader` seems unsafe without an 
epoch bump. Another case is `updateAssignmentAndIsr` which blindly overrides 
the current ISR state. This makes sense when there is an epoch bump, but not 
otherwise, and could lead to the exposing of uncommitted data. We'd also need 
to fix the logging since it would be confusing otherwise. So all in all, I 
think it's best to take another route to updating the topicId (as the latest 
commit does).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to