junrao commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1678493157


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -427,7 +427,7 @@ class Partition(val topicPartition: TopicPartition,
     * @param highWatermarkCheckpoints Checkpoint to load initial high 
watermark from
     * @return true iff the future replica is created
     */
-  def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: 
OffsetCheckpoints): Boolean = {
+  def maybeCreateFutureReplica(logDir: String, highWatermarkCheckpoints: 
OffsetCheckpoints, topicId: Option[Uuid] = topicId): Boolean = {

Review Comment:
   Could we add the new param to the javadoc?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
         partition.log.foreach { _ =>
           val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-          // Add future replica log to partition's map
-          partition.createLogIfNotExists(
-            isNew = false,
-            isFutureReplica = true,
-            offsetCheckpoints,
-            topicIds(partition.topic))
-
-          // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-          // replica from source dir to destination dir
-          logManager.abortAndPauseCleaning(topicPartition)
+          // Add future replica log to partition's map if it's not existed
+          if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
+            // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
+            // replica from source dir to destination dir
+            logManager.abortAndPauseCleaning(topicPartition)
+          }
 
           futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
             partition.getLeaderEpoch, futureLog.highWatermark))
         }
       }
     }
 
-    if (futureReplicasAndInitialOffset.nonEmpty)
+    if (futureReplicasAndInitialOffset.nonEmpty) {
+      // Even though it's possible that there is another thread adding fetcher 
for this future log partition,
+      // but it's fine because `BrokerIdAndFetcherId` will be identical and 
the operation will be no-op.

Review Comment:
   When the leader changes, we need to propagate the new leader epoch to 
ReplicaAlterLogDirsThread (see https://github.com/apache/kafka/pull/8223). So, 
the operation is not a no-op?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
         partition.log.foreach { _ =>
           val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-          // Add future replica log to partition's map
-          partition.createLogIfNotExists(
-            isNew = false,
-            isFutureReplica = true,
-            offsetCheckpoints,
-            topicIds(partition.topic))
-
-          // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-          // replica from source dir to destination dir
-          logManager.abortAndPauseCleaning(topicPartition)
+          // Add future replica log to partition's map if it's not existed
+          if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {
+            // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
+            // replica from source dir to destination dir
+            logManager.abortAndPauseCleaning(topicPartition)
+          }
 
           futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
             partition.getLeaderEpoch, futureLog.highWatermark))
         }
       }
     }
 
-    if (futureReplicasAndInitialOffset.nonEmpty)
+    if (futureReplicasAndInitialOffset.nonEmpty) {
+      // Even though it's possible that there is another thread adding fetcher 
for this future log partition,

Review Comment:
   Hmm, `becomeLeaderOrFollower()`, `alterReplicaLogDirs()`, and `applyDelta()` 
are done under the `replicaStateChangeLock`. Is it really possible for another 
thread to add fetcher for the future log?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2114,25 +2114,24 @@ class ReplicaManager(val config: KafkaConfig,
         partition.log.foreach { _ =>
           val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-          // Add future replica log to partition's map
-          partition.createLogIfNotExists(
-            isNew = false,
-            isFutureReplica = true,
-            offsetCheckpoints,
-            topicIds(partition.topic))
-
-          // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-          // replica from source dir to destination dir
-          logManager.abortAndPauseCleaning(topicPartition)
+          // Add future replica log to partition's map if it's not existed

Review Comment:
   if it's not existed => if it doesn't exist



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to