hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596231610



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition,
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            createLogIfNotExists(isNew = false, isFutureReplica = true, 
highWatermarkCheckpoints)
+            // not sure if topic ID should be none here, but not sure if we 
have access in ReplicaManager where this is called.
+            // could also use topicId method here potentially. This is only 
used in ReplicaManager (ZK code) so probably ok to set as None.
+            createLogIfNotExists(isNew = false, isFutureReplica = true, 
highWatermarkCheckpoints, None)
             true
         }
       }
     }
   }
 
-  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints): Unit = {
+  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = None): Unit = {

Review comment:
       We usually try to avoid optional arguments. The problem is that they can 
be easily overlooked.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       Maybe I am missing something, but why is it necessary to set the topicId 
here? I was expecting that we would do this in `makeLeader` and `makeFollower`. 
I'm a tad uncomfortable exposing the `Log` object through `Partition` before we 
have directly associated it.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition,
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            createLogIfNotExists(isNew = false, isFutureReplica = true, 
highWatermarkCheckpoints)
+            // not sure if topic ID should be none here, but not sure if we 
have access in ReplicaManager where this is called.

Review comment:
       I cannot think of a reason not to pass the topic id if we have it 
available. Otherwise, we would need logic to set it again after the log dir is 
swapped.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to