This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 16a7713830 KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch is ahead (#12514) 16a7713830 is described below commit 16a77138305c429b3adc62b34c128eb242cadfb5 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Mon Aug 15 11:34:29 2022 -0700 KAFKA-14154; Kraft controller should return NOT_CONTROLLER if request epoch is ahead (#12514) Similar to https://github.com/apache/kafka/pull/12506. For the Kraft controller, we should return NOT_CONTROLLER if the leader/partition epoch in the request is ahead of the controller. Reviewers: José Armando García Sancio <jsan...@users.noreply.github.com> --- .../kafka/controller/ReplicationControlManager.java | 21 +++++++++++++++++++-- .../controller/ReplicationControlManagerTest.java | 12 +++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 4ffb339967..df7097df83 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -125,6 +125,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST; import static org.apache.kafka.common.protocol.Errors.INVALID_UPDATE_VERSION; import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED; import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER; import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS; import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED; import static org.apache.kafka.common.protocol.Errors.TOPIC_AUTHORIZATION_FAILED; @@ -1071,7 +1072,23 @@ public class ReplicationControlManager { return UNKNOWN_TOPIC_OR_PARTITION; } - if (partitionData.leaderEpoch() != partition.leaderEpoch) { + + // If the partition leader has a higher leader/partition epoch, then it is likely + // that this node is no longer the active controller. We return NOT_CONTROLLER in + // this case to give the leader an opportunity to find the new controller. + if (partitionData.leaderEpoch() > partition.leaderEpoch) { + log.debug("Rejecting AlterPartition request from node {} for {}-{} because " + + "the current leader epoch is {}, which is greater than the local value {}.", + brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch()); + return NOT_CONTROLLER; + } + if (partitionData.partitionEpoch() > partition.partitionEpoch) { + log.debug("Rejecting AlterPartition request from node {} for {}-{} because " + + "the current partition epoch is {}, which is greater than the local value {}.", + brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch()); + return NOT_CONTROLLER; + } + if (partitionData.leaderEpoch() < partition.leaderEpoch) { log.debug("Rejecting AlterPartition request from node {} for {}-{} because " + "the current leader epoch is {}, not {}.", brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch()); @@ -1085,7 +1102,7 @@ public class ReplicationControlManager { return INVALID_REQUEST; } - if (partitionData.partitionEpoch() != partition.partitionEpoch) { + if (partitionData.partitionEpoch() < partition.partitionEpoch) { log.info("Rejecting AlterPartition request from node {} for {}-{} because " + "the current partition epoch is {}, not {}.", brokerId, topic.name, partitionId, partition.partitionEpoch, diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index d33776ca10..82e378a982 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -122,6 +122,7 @@ import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION; import static org.apache.kafka.common.protocol.Errors.NEW_LEADER_ELECTED; import static org.apache.kafka.common.protocol.Errors.NONE; +import static org.apache.kafka.common.protocol.Errors.NOT_CONTROLLER; import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS; import static org.apache.kafka.common.protocol.Errors.OPERATION_NOT_ATTEMPTED; import static org.apache.kafka.common.protocol.Errors.POLICY_VIOLATION; @@ -960,7 +961,16 @@ public class ReplicationControlManagerTest { ControllerResult<AlterPartitionResponseData> invalidLeaderEpochResult = sendAlterPartition( replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), topicIdPartition.topicId(), invalidLeaderEpochRequest); - assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, FENCED_LEADER_EPOCH); + assertAlterPartitionResponse(invalidLeaderEpochResult, topicIdPartition, NOT_CONTROLLER); + + // Invalid partition epoch + PartitionData invalidPartitionEpochRequest = newAlterPartition( + replicationControl, topicIdPartition, asList(0, 1), LeaderRecoveryState.RECOVERED); + invalidPartitionEpochRequest.setPartitionEpoch(500); + ControllerResult<AlterPartitionResponseData> invalidPartitionEpochResult = sendAlterPartition( + replicationControl, leaderId, ctx.currentBrokerEpoch(leaderId), + topicIdPartition.topicId(), invalidPartitionEpochRequest); + assertAlterPartitionResponse(invalidPartitionEpochResult, topicIdPartition, NOT_CONTROLLER); // Invalid ISR (3 is not a valid replica) PartitionData invalidIsrRequest1 = newAlterPartition(