soarez commented on code in PR #15951:
URL: https://github.com/apache/kafka/pull/15951#discussion_r1608164096


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2114,19 +2114,16 @@ class ReplicaManager(val config: KafkaConfig,
         partition.log.foreach { _ =>
           val leader = BrokerEndPoint(config.brokerId, "localhost", -1)
 
-          // Add future replica log to partition's map
-          partition.createLogIfNotExists(
-            isNew = false,
-            isFutureReplica = true,
-            offsetCheckpoints,
-            topicIds(partition.topic))
-
-          // pause cleaning for partitions that are being moved and start 
ReplicaAlterDirThread to move
-          // replica from source dir to destination dir
-          logManager.abortAndPauseCleaning(topicPartition)
-
-          futureReplicasAndInitialOffset.put(topicPartition, 
InitialFetchState(topicIds(topicPartition.topic), leader,
-            partition.getLeaderEpoch, futureLog.highWatermark))
+          if (partition.maybeCreateFutureReplica(futureLog.parentDir, 
offsetCheckpoints, topicIds(partition.topic))) {

Review Comment:
   >  That's fine because there is no chances that futureLog exists in 
partition#futureLog but not in logManager#futureLogs map.
   
   The scenario I was thinking of is when the broker starts up, `logManager` 
loads the future log, so futureLog exists in `partition#futureLog` but not in 
`logManager#futureLogs` map yet. Only later the when the broker catches up with 
metatada (`ReplicaManager#applyDelta`) or receives a LeaderAndIsr request 
(`becomeLeaderOrFollower`) and this method `maybeAddLogDirFetchers` is called, 
is when we need to make sure `partition#futureLog` is populated too.
   
   But you're right, my confusion here was with where maybeCreateFutureReplica 
checks if the futureLog already exists, it checks in itself (Partition) not in 
LogManager, so this makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to