dajac commented on code in PR #12187:
URL: https://github.com/apache/kafka/pull/12187#discussion_r879677034
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -2141,23 +2141,22 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.trace(s"Unable to start fetching $tp with topic
" +
s"ID ${info.topicId} because the replica manager is shutting
down.")
} else {
- if (isInControlledShutdown &&
!info.partition.isr.contains(config.brokerId)) {
- // If we are in controlled shutdown and the replica is not in
the ISR,
- // we stop the replica.
+ // We always update the follower state.
+ // - This ensure that a replica with no leader can step down;
+ // - This also ensures that the local replica is created even if
the leader
+ // is unavailable. This is required to ensure that we include
the partition's
+ // high watermark in the checkpoint file (see KAFKA-1647).
+ val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
+ val isNewLeaderEpoch = partition.makeFollower(state,
offsetCheckpoints, Some(info.topicId))
+
+ if (isInControlledShutdown && (info.partition.leader == NO_LEADER
||
+ !info.partition.isr.contains(config.brokerId))) {
+ // During controlled shutdown, replica with no leaders and
replica
+ // where this broker is not in the ISR are stopped.
partitionsToStop.put(tp, false)
- } else if (newImage.cluster.broker(info.partition.leader) == null)
{
Review Comment:
It should be handled a bit later, at L2192. If we have no leader, `nodeOpt`
will be `None` and we don't start the fetcher. It seems to be similar to the
previous implementation to me but I may have missed something.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]