hachikuji commented on code in PR #12187:
URL: https://github.com/apache/kafka/pull/12187#discussion_r881165574


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2118,36 +2124,39 @@ class ReplicaManager(val config: KafkaConfig,
     newImage: MetadataImage,
     delta: TopicsDelta,
     offsetCheckpoints: OffsetCheckpoints,
-    newLocalFollowers: mutable.Map[TopicPartition, 
LocalReplicaChanges.PartitionInfo]
+    localFollowers: mutable.Map[TopicPartition, 
LocalReplicaChanges.PartitionInfo]
   ): Unit = {
-    stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size} 
partition(s) to " +
+    stateChangeLogger.info(s"Transitioning ${localFollowers.size} partition(s) 
to " +
       "local followers.")
     val shuttingDown = isShuttingDown.get()
-    val partitionsToMakeFollower = new mutable.HashMap[TopicPartition, 
Partition]
-    val newFollowerTopicSet = new mutable.HashSet[String]
-    newLocalFollowers.forKeyValue { (tp, info) =>
+    val partitionsToStartFetching = new mutable.HashMap[TopicPartition, 
Partition]
+    val partitionsToStopFetching = new mutable.HashMap[TopicPartition, Boolean]
+    val followerTopicSet = new mutable.HashSet[String]
+    localFollowers.forKeyValue { (tp, info) =>
       getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
         try {
-          newFollowerTopicSet.add(tp.topic)
+          followerTopicSet.add(tp.topic)
 
           if (shuttingDown) {
             stateChangeLogger.trace(s"Unable to start fetching $tp with topic 
" +
               s"ID ${info.topicId} because the replica manager is shutting 
down.")
           } else {
-            val leader = info.partition.leader
-            if (newImage.cluster.broker(leader) == null) {
-              stateChangeLogger.trace(s"Unable to start fetching $tp with 
topic ID ${info.topicId} " +
-                s"from leader $leader because it is not alive.")
-
-              // Create the local replica even if the leader is unavailable. 
This is required
-              // to ensure that we include the partition's high watermark in 
the checkpoint
-              // file (see KAFKA-1647).
-              partition.createLogIfNotExists(isNew, false, offsetCheckpoints, 
Some(info.topicId))
-            } else {
-              val state = info.partition.toLeaderAndIsrPartitionState(tp, 
isNew)
-              if (partition.makeFollower(state, offsetCheckpoints, 
Some(info.topicId))) {
-                partitionsToMakeFollower.put(tp, partition)
-              }
+            // We always update the follower state.
+            // - This ensure that a replica with no leader can step down;
+            // - This also ensures that the local replica is created even if 
the leader
+            //   is unavailable. This is required to ensure that we include 
the partition's
+            //   high watermark in the checkpoint file (see KAFKA-1647).
+            val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+            val isNewLeaderEpoch = partition.makeFollower(state, 
offsetCheckpoints, Some(info.topicId))
+
+            if (isInControlledShutdown && (info.partition.leader == NO_LEADER 
||
+                !info.partition.isr.contains(config.brokerId))) {
+              // During controlled shutdown, replica with no leaders and 
replica
+              // where this broker is not in the ISR are stopped.
+              partitionsToStopFetching.put(tp, false)

Review Comment:
   One of the things we do in the `ReplicaManager.stopPartitions` is force 
complete purgatory operations. Since we are not leader anymore, they can be 
completed immediately with an error. I might have missed it, but it seems like 
we don't have similar logic down this path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to