junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r668323818



##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.
+ */
+public class PartitionChangeBuilder {
+    public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
+        if (record.isr() != null) return false;
+        if (record.leader() != NO_LEADER_CHANGE) return false;
+        if (record.replicas() != null) return false;
+        if (record.removingReplicas() != null) return false;
+        if (record.addingReplicas() != null) return false;
+        return true;
+    }
+
+    private final PartitionRegistration partition;
+    private final Uuid topicId;
+    private final int partitionId;
+    private final Function<Integer, Boolean> isAcceptableLeader;
+    private final Supplier<Boolean> uncleanElectionOk;
+    private List<Integer> targetIsr;
+    private List<Integer> targetReplicas;
+    private List<Integer> targetRemoving;
+    private List<Integer> targetAdding;
+    private boolean alwaysElectPreferredIfPossible;
+
+    public PartitionChangeBuilder(PartitionRegistration partition,
+                                  Uuid topicId,
+                                  int partitionId,
+                                  Function<Integer, Boolean> 
isAcceptableLeader,
+                                  Supplier<Boolean> uncleanElectionOk) {
+        this.partition = partition;
+        this.topicId = topicId;
+        this.partitionId = partitionId;
+        this.isAcceptableLeader = isAcceptableLeader;
+        this.uncleanElectionOk = uncleanElectionOk;
+        this.targetIsr = Replicas.toList(partition.isr);
+        this.targetReplicas = Replicas.toList(partition.replicas);
+        this.targetRemoving = Replicas.toList(partition.removingReplicas);
+        this.targetAdding = Replicas.toList(partition.addingReplicas);
+        this.alwaysElectPreferredIfPossible = false;
+    }
+
+    public PartitionChangeBuilder setTargetIsr(List<Integer> targetIsr) {
+        this.targetIsr = targetIsr;
+        return this;
+    }
+
+    public PartitionChangeBuilder setTargetReplicas(List<Integer> 
targetReplicas) {
+        this.targetReplicas = targetReplicas;
+        return this;
+    }
+
+    public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean 
alwaysElectPreferredIfPossible) {

Review comment:
       Do we need this since it's only used in tests?

##########
File path: 
metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
##########
@@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() {
                 setIsNew(false).toString(),
             b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), 
false).toString());
     }
+
+    @Test
+    public void testMergePartitionChangeRecordWithReassignmentData() {
+        PartitionRegistration partition0 = new PartitionRegistration(new int[] 
{1, 2, 3},
+            new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200);
+        PartitionRegistration partition1 = partition0.merge(new 
PartitionChangeRecord().
+            setRemovingReplicas(Collections.singletonList(3)).
+            setAddingReplicas(Collections.singletonList(4)).
+            setReplicas(Arrays.asList(1, 2, 3, 4)));
+        assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4},
+            new int[] {1, 2, 3}, new int[] {3}, new int[] {4}, 1, 100, 201), 
partition1);
+        PartitionRegistration partition2 = partition1.merge(new 
PartitionChangeRecord().
+            setIsr(Arrays.asList(1, 2, 4)).
+            setRemovingReplicas(Collections.emptyList()).
+            setAddingReplicas(Collections.emptyList()).
+            setReplicas(Arrays.asList(1, 2, 4)));
+        assertEquals(new PartitionRegistration(new int[] {1, 2, 4},
+            new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, 100, 202), 
partition2);
+        assertFalse(partition2.isReassigning());
+    }
+
+    @Test
+    public void testPartitionControlInfoIsrChangeCompletesReassignment() {
+        PartitionRegistration partition0 = new PartitionRegistration(
+            new int[]{1, 2, 3, 4}, new int[]{3}, new int[]{3}, new int[] {}, 
1, 0, 0);

Review comment:
       Could new int[] {} be Replicas.NONE?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.List;
+import java.util.Objects;
+
+
+class PartitionReassignmentRevert {
+    private final List<Integer> replicas;
+    private final List<Integer> isr;
+
+    PartitionReassignmentRevert(PartitionRegistration registration) {
+        // Figure out the replica list and ISR that we will have after 
reverting the
+        // reassignment. In general, we want to take out any replica that the 
reassignment
+        // was adding, but keep the ones the reassignment was removing. (But 
see the
+        // special case below.)
+        Set<Integer> adding = Replicas.toSet(registration.addingReplicas);
+        this.replicas = new ArrayList<>(registration.replicas.length);
+        this.isr = new ArrayList<>(registration.isr.length);
+        for (int i = 0; i < registration.isr.length; i++) {
+            int replica = registration.isr[i];
+            if (!adding.contains(replica)) {
+                this.isr.add(replica);
+            } else if (i == registration.isr.length - 1 && isr.isEmpty()) {
+                // This is a special case where taking out all the "adding" 
replicas is
+                // not possible. The reason it is not possible is that doing 
so would
+                // create an empty ISR, which is not allowed.
+                //
+                // In this case, we leave in one of the adding replicas 
permanently.

Review comment:
       This is different from how the old controller does. The old controller 
simply reverts the replicas to the original ones in 
KafkaController.maybeBuildReassignment(). We can change the behavior, but we 
probably need a KIP and do this consistently between the old and the new 
controller.

##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
##########
@@ -103,7 +103,8 @@ public TopicImage apply() {
         for (Entry<Integer, PartitionRegistration> entry : 
partitionChanges.entrySet()) {
             if (entry.getValue().leader == brokerId) {
                 PartitionRegistration prevPartition = 
image.partitions().get(entry.getKey());
-                if (prevPartition == null || prevPartition.leader != brokerId) 
{
+                if (prevPartition == null ||
+                        prevPartition.leaderEpoch != 
entry.getValue().leaderEpoch) {

Review comment:
       Should we check for partitionEpoch change too? Ditto in 
newLocalFollowers().

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
##########
@@ -180,6 +187,44 @@ public LeaderAndIsrPartitionState 
toLeaderAndIsrPartitionState(TopicPartition tp
             setIsNew(isNew);
     }
 
+    /**
+     * Returns true if this partition is reassigning.
+     */
+    public boolean isReassigning() {
+        return removingReplicas.length > 0 | addingReplicas.length > 0;
+    }
+
+    /**
+     * Check if an ISR change completes this partition's reassignment.
+     *
+     * @param newIsr    The new ISR.
+     * @return          True if the reassignment is complete.
+     */
+    public boolean isrChangeCompletesReassignment(int[] newIsr) {

Review comment:
       It seems that this is only used in tests and is duplicating the logic in 
PartitionChangeBuidler.completeReassignmentIfNeeded(). Do we still need it? If 
so, could we move this to a test util?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static 
org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class PartitionChangeBuilderTest {
+    @Test
+    public void testChangeRecordIsNoOp() {
+        assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
+        assertFalse(changeRecordIsNoOp(new 
PartitionChangeRecord().setLeader(1)));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setIsr(Arrays.asList(1, 2, 3))));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setRemovingReplicas(Arrays.asList(1))));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setAddingReplicas(Arrays.asList(4))));
+    }
+
+    private final static PartitionRegistration FOO = new PartitionRegistration(
+        new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
+        1, 100, 200);
+
+    private final static Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+    private static PartitionChangeBuilder createFooBuilder(boolean 
allowUnclean) {
+        return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> 
allowUnclean);
+    }
+
+    private final static PartitionRegistration BAR = new PartitionRegistration(
+        new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] 
{4},
+        1, 100, 200);
+
+    private final static Uuid BAR_ID = 
Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
+
+    private static PartitionChangeBuilder createBarBuilder(boolean 
allowUnclean) {
+        return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> 
allowUnclean);
+    }
+
+    private static void assertBestLeaderEquals(PartitionChangeBuilder builder,
+                                               int expectedNode,
+                                               boolean expectedUnclean) {
+        BestLeader bestLeader = builder.new BestLeader();
+        assertEquals(expectedNode, bestLeader.node);
+        assertEquals(expectedUnclean, bestLeader.unclean);
+    }
+
+    @Test
+    public void testBestLeader() {
+        assertBestLeaderEquals(createFooBuilder(false), 2, false);
+        assertBestLeaderEquals(createFooBuilder(true), 2, false);
+        assertBestLeaderEquals(createFooBuilder(false).
+                setTargetIsr(Arrays.asList(1, 3)), 1, false);
+        assertBestLeaderEquals(createFooBuilder(true).
+            setTargetIsr(Arrays.asList(1, 3)), 1, false);
+        assertBestLeaderEquals(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(3)), NO_LEADER, false);
+        assertBestLeaderEquals(createFooBuilder(true).
+            setTargetIsr(Arrays.asList(3)), 2, true);
+        assertBestLeaderEquals(createFooBuilder(true).
+                
setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+            4, false);
+    }
+
+    @Test
+    public void testShouldTryElection() {
+        assertFalse(createFooBuilder(false).shouldTryElection());
+        
assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true).
+            shouldTryElection());
+        assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).
+            shouldTryElection());
+        assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
+            shouldTryElection());
+    }
+
+    private static void 
testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
+                                                                 
PartitionChangeRecord record,
+                                                                 int 
expectedLeader) {
+        builder.triggerLeaderEpochBumpIfNeeded(record);
+        assertEquals(expectedLeader, record.leader());
+    }
+
+    @Test
+    public void testTriggerLeaderEpochBumpIfNeeded() {
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false),
+            new PartitionChangeRecord(), NO_LEADER_CHANGE);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+            new PartitionChangeRecord().setLeader(2), 2);
+    }
+
+    @Test
+    public void testNoChange() {
+        assertEquals(Optional.empty(), createFooBuilder(false).build());
+        assertEquals(Optional.empty(), createFooBuilder(true).build());
+        assertEquals(Optional.empty(), createBarBuilder(false).build());
+        assertEquals(Optional.empty(), createBarBuilder(true).build());
+    }
+
+    @Test
+    public void testIsrChangeAndLeaderBump() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+            setTopicId(FOO_ID).
+            setPartitionId(0).
+            setIsr(Arrays.asList(2, 1)).
+            setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build());
+    }
+
+    @Test
+    public void testIsrChangeAndLeaderChange() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setIsr(Arrays.asList(2, 3)).
+                setLeader(2), 
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build());
+    }
+
+    @Test
+    public void testReassignmentRearrangesReplicas() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(3, 2, 1)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 
1)).build());
+    }
+
+    @Test
+    public void testIsrEnlargementCompletesReassignment() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(BAR_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(2, 3, 4)).
+                setIsr(Arrays.asList(2, 3, 4)).
+                setLeader(2).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 
4)).build());
+    }
+
+    @Test
+    public void testRevertReassignment() {
+        PartitionReassignmentRevert revert = new 
PartitionReassignmentRevert(BAR);
+        assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
+        assertEquals(Arrays.asList(1, 2, 3), revert.isr());
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(BAR_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(1, 2, 3)).
+                setLeader(1).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createBarBuilder(false).
+                setTargetReplicas(revert.replicas()).
+                setTargetIsr(revert.isr()).
+                setTargetRemoving(Collections.emptyList()).
+                setTargetAdding(Collections.emptyList()).
+                build());
+    }
+
+    @Test
+    public void testQuickReassignment() {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+            Replicas.toList(FOO.replicas), Arrays.asList(1, 2));
+        assertEquals(Collections.singletonList(3), replicas.removing());
+        assertEquals(Collections.emptyList(), replicas.adding());
+        assertEquals(Arrays.asList(1, 2, 3), replicas.merged());
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(1, 2)).
+                setIsr(Arrays.asList(2, 1)).
+                setLeader(1),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).
+                setTargetReplicas(replicas.merged()).
+                setTargetRemoving(replicas.removing()).
+                build());
+    }
+
+    @Test
+    public void testStartLongReassignment() {

Review comment:
       testStartLongReassignment =>testAddingReplicaReassignment ?

##########
File path: 
metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
##########
@@ -180,6 +187,44 @@ public LeaderAndIsrPartitionState 
toLeaderAndIsrPartitionState(TopicPartition tp
             setIsNew(isNew);
     }
 
+    /**
+     * Returns true if this partition is reassigning.
+     */
+    public boolean isReassigning() {
+        return removingReplicas.length > 0 | addingReplicas.length > 0;

Review comment:
       Shoud | be || ?

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##########
@@ -558,51 +608,111 @@ BrokersToIsrs brokersToIsrs() {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
                         setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()));
+                    log.info("Rejecting alterIsr request for unknown partition 
{}-{}.",
+                        topic.name, partitionData.partitionIndex());
                     continue;
                 }
-                if (request.brokerId() != partition.leader) {
+                if (partitionData.leaderEpoch() != partition.leaderEpoch) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(INVALID_REQUEST.code()));
+                        setErrorCode(FENCED_LEADER_EPOCH.code()));
+                    log.debug("Rejecting alterIsr request from node {} for 
{}-{} because " +
+                            "the current leader epoch is {}, not {}.", 
request.brokerId(),
+                        topic.name, partitionData.partitionIndex(),
+                        partition.leaderEpoch, partitionData.leaderEpoch());
                     continue;
                 }
-                if (partitionData.leaderEpoch() != partition.leaderEpoch) {
+                if (request.brokerId() != partition.leader) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.FENCED_LEADER_EPOCH.code()));
+                        setErrorCode(INVALID_REQUEST.code()));
+                    log.info("Rejecting alterIsr request from node {} for 
{}-{} because " +
+                            "the current leader is {}.", request.brokerId(), 
topic.name,
+                            partitionData.partitionIndex(), partition.leader);
                     continue;
                 }
                 if (partitionData.currentIsrVersion() != 
partition.partitionEpoch) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
-                        setErrorCode(Errors.INVALID_UPDATE_VERSION.code()));
+                        setErrorCode(INVALID_UPDATE_VERSION.code()));
+                    log.info("Rejecting alterIsr request from node {} for 
{}-{} because " +
+                            "the current partition epoch is {}, not {}.", 
request.brokerId(),
+                            topic.name, partitionData.partitionIndex(),
+                            partition.partitionEpoch, 
partitionData.currentIsrVersion());
                     continue;
                 }
                 int[] newIsr = Replicas.toArray(partitionData.newIsr());
                 if (!Replicas.validateIsr(partition.replicas, newIsr)) {
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
                         setErrorCode(INVALID_REQUEST.code()));
+                    log.error("Rejecting alterIsr request from node {} for 
{}-{} because " +
+                            "it specified an invalid ISR {}.", 
request.brokerId(),
+                            topic.name, partitionData.partitionIndex(),
+                            partitionData.newIsr());
                     continue;
                 }
                 if (!Replicas.contains(newIsr, partition.leader)) {
-                    // An alterIsr request can't remove the current leader.
+                    // An alterIsr request can't ask for the current leader to 
be removed.
                     responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
                         setPartitionIndex(partitionData.partitionIndex()).
                         setErrorCode(INVALID_REQUEST.code()));
+                    log.error("Rejecting alterIsr request from node {} for 
{}-{} because " +
+                            "it specified an invalid ISR {} that doesn't 
include itself.",
+                            request.brokerId(), topic.name, 
partitionData.partitionIndex(),
+                            partitionData.newIsr());
                     continue;
                 }
-                records.add(new ApiMessageAndVersion(new 
PartitionChangeRecord().
-                    setPartitionId(partitionData.partitionIndex()).
-                    setTopicId(topic.id).
-                    setIsr(partitionData.newIsr()), 
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
+                // At this point, we have decided to perform the ISR change. 
We use
+                // PartitionChangeBuilder to find out what its effect will be.
+                PartitionChangeBuilder builder = new 
PartitionChangeBuilder(partition,
+                    topic.id,
+                    partitionData.partitionIndex(),
+                    r -> clusterControl.unfenced(r),
+                    () -> 
configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name()));
+                builder.setTargetIsr(partitionData.newIsr());
+                Optional<ApiMessageAndVersion> record = builder.build();
+                Errors result = Errors.NONE;
+                if (record.isPresent()) {
+                    records.add(record.get());
+                    PartitionChangeRecord change = (PartitionChangeRecord) 
record.get().message();
+                    partition = partition.merge(change);
+                    if (log.isDebugEnabled()) {
+                        log.debug("Node {} has altered ISR for {}-{} to {}.",
+                            request.brokerId(), topic.name, 
partitionData.partitionIndex(),
+                            change.isr());
+                    }
+                    if (change.leader() != request.brokerId() &&
+                            change.leader() != NO_LEADER_CHANGE) {
+                        // Normally, an alterIsr request, which is made by the 
partition
+                        // leader itself, is not allowed to modify the 
partition leader.
+                        // However, if there is an ongoing partition 
reassignment and the
+                        // ISR change completes it, then the leader may change 
as part of
+                        // the changes made during reassignment cleanup.
+                        //
+                        // In this case, we report back FENCED_LEADER_EPOCH to 
the leader
+                        // which made the alterIsr request. This lets it know 
that it must
+                        // fetch new metadata before trying again. This return 
code is
+                        // unusual because we both return an error and 
generate a new
+                        // metadata record. We usually only do one or the 
other.
+                        log.info("Returning FENCED_LEADER_EPOCH for alterIsr 
request " +
+                                "from node {} for {}-{} because the new ISR 
completed " +
+                                "the partition reassignment, and triggered a 
leadership " +
+                                "change.", request.brokerId(), topic.name,
+                            partitionData.partitionIndex());
+                        responseTopicData.partitions().add(new 
AlterIsrResponseData.PartitionData().
+                            setPartitionIndex(partitionData.partitionIndex()).
+                            setErrorCode(FENCED_LEADER_EPOCH.code()));
+                        continue;
+                    }

Review comment:
       It would be useful to add an info level logging when an AlterIsr 
triggers the completion of the partition reassignment.

##########
File path: 
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * PartitionMutator handles changing partition registrations.

Review comment:
       The class is no longer called PartitionMutator.

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static 
org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class PartitionChangeBuilderTest {
+    @Test
+    public void testChangeRecordIsNoOp() {
+        assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
+        assertFalse(changeRecordIsNoOp(new 
PartitionChangeRecord().setLeader(1)));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setIsr(Arrays.asList(1, 2, 3))));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setRemovingReplicas(Arrays.asList(1))));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setAddingReplicas(Arrays.asList(4))));
+    }
+
+    private final static PartitionRegistration FOO = new PartitionRegistration(
+        new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
+        1, 100, 200);
+
+    private final static Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+    private static PartitionChangeBuilder createFooBuilder(boolean 
allowUnclean) {
+        return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> 
allowUnclean);
+    }
+
+    private final static PartitionRegistration BAR = new PartitionRegistration(
+        new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] 
{4},
+        1, 100, 200);
+
+    private final static Uuid BAR_ID = 
Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
+
+    private static PartitionChangeBuilder createBarBuilder(boolean 
allowUnclean) {
+        return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> 
allowUnclean);
+    }
+
+    private static void assertBestLeaderEquals(PartitionChangeBuilder builder,
+                                               int expectedNode,
+                                               boolean expectedUnclean) {
+        BestLeader bestLeader = builder.new BestLeader();
+        assertEquals(expectedNode, bestLeader.node);
+        assertEquals(expectedUnclean, bestLeader.unclean);
+    }
+
+    @Test
+    public void testBestLeader() {
+        assertBestLeaderEquals(createFooBuilder(false), 2, false);
+        assertBestLeaderEquals(createFooBuilder(true), 2, false);
+        assertBestLeaderEquals(createFooBuilder(false).
+                setTargetIsr(Arrays.asList(1, 3)), 1, false);
+        assertBestLeaderEquals(createFooBuilder(true).
+            setTargetIsr(Arrays.asList(1, 3)), 1, false);
+        assertBestLeaderEquals(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(3)), NO_LEADER, false);
+        assertBestLeaderEquals(createFooBuilder(true).
+            setTargetIsr(Arrays.asList(3)), 2, true);
+        assertBestLeaderEquals(createFooBuilder(true).
+                
setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+            4, false);
+    }
+
+    @Test
+    public void testShouldTryElection() {
+        assertFalse(createFooBuilder(false).shouldTryElection());
+        
assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true).
+            shouldTryElection());
+        assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).
+            shouldTryElection());
+        assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
+            shouldTryElection());
+    }
+
+    private static void 
testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
+                                                                 
PartitionChangeRecord record,
+                                                                 int 
expectedLeader) {
+        builder.triggerLeaderEpochBumpIfNeeded(record);
+        assertEquals(expectedLeader, record.leader());
+    }
+
+    @Test
+    public void testTriggerLeaderEpochBumpIfNeeded() {
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false),
+            new PartitionChangeRecord(), NO_LEADER_CHANGE);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+            new PartitionChangeRecord().setLeader(2), 2);
+    }
+
+    @Test
+    public void testNoChange() {
+        assertEquals(Optional.empty(), createFooBuilder(false).build());
+        assertEquals(Optional.empty(), createFooBuilder(true).build());
+        assertEquals(Optional.empty(), createBarBuilder(false).build());
+        assertEquals(Optional.empty(), createBarBuilder(true).build());
+    }
+
+    @Test
+    public void testIsrChangeAndLeaderBump() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+            setTopicId(FOO_ID).
+            setPartitionId(0).
+            setIsr(Arrays.asList(2, 1)).
+            setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build());
+    }
+
+    @Test
+    public void testIsrChangeAndLeaderChange() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setIsr(Arrays.asList(2, 3)).
+                setLeader(2), 
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build());
+    }
+
+    @Test
+    public void testReassignmentRearrangesReplicas() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(3, 2, 1)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 
1)).build());
+    }
+
+    @Test
+    public void testIsrEnlargementCompletesReassignment() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(BAR_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(2, 3, 4)).
+                setIsr(Arrays.asList(2, 3, 4)).
+                setLeader(2).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 
4)).build());
+    }
+
+    @Test
+    public void testRevertReassignment() {
+        PartitionReassignmentRevert revert = new 
PartitionReassignmentRevert(BAR);
+        assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
+        assertEquals(Arrays.asList(1, 2, 3), revert.isr());
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(BAR_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(1, 2, 3)).
+                setLeader(1).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createBarBuilder(false).
+                setTargetReplicas(revert.replicas()).
+                setTargetIsr(revert.isr()).
+                setTargetRemoving(Collections.emptyList()).
+                setTargetAdding(Collections.emptyList()).
+                build());
+    }
+
+    @Test
+    public void testQuickReassignment() {

Review comment:
       testQuickReassignment =>testRemovingReplicaReassignment ?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
##########
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
+import static 
org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class PartitionChangeBuilderTest {
+    @Test
+    public void testChangeRecordIsNoOp() {
+        assertTrue(changeRecordIsNoOp(new PartitionChangeRecord()));
+        assertFalse(changeRecordIsNoOp(new 
PartitionChangeRecord().setLeader(1)));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setIsr(Arrays.asList(1, 2, 3))));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setRemovingReplicas(Arrays.asList(1))));
+        assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().
+            setAddingReplicas(Arrays.asList(4))));
+    }
+
+    private final static PartitionRegistration FOO = new PartitionRegistration(
+        new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE,
+        1, 100, 200);
+
+    private final static Uuid FOO_ID = 
Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg");
+
+    private static PartitionChangeBuilder createFooBuilder(boolean 
allowUnclean) {
+        return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> 
allowUnclean);
+    }
+
+    private final static PartitionRegistration BAR = new PartitionRegistration(
+        new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] 
{4},
+        1, 100, 200);
+
+    private final static Uuid BAR_ID = 
Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw");
+
+    private static PartitionChangeBuilder createBarBuilder(boolean 
allowUnclean) {
+        return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> 
allowUnclean);
+    }
+
+    private static void assertBestLeaderEquals(PartitionChangeBuilder builder,
+                                               int expectedNode,
+                                               boolean expectedUnclean) {
+        BestLeader bestLeader = builder.new BestLeader();
+        assertEquals(expectedNode, bestLeader.node);
+        assertEquals(expectedUnclean, bestLeader.unclean);
+    }
+
+    @Test
+    public void testBestLeader() {
+        assertBestLeaderEquals(createFooBuilder(false), 2, false);
+        assertBestLeaderEquals(createFooBuilder(true), 2, false);
+        assertBestLeaderEquals(createFooBuilder(false).
+                setTargetIsr(Arrays.asList(1, 3)), 1, false);
+        assertBestLeaderEquals(createFooBuilder(true).
+            setTargetIsr(Arrays.asList(1, 3)), 1, false);
+        assertBestLeaderEquals(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(3)), NO_LEADER, false);
+        assertBestLeaderEquals(createFooBuilder(true).
+            setTargetIsr(Arrays.asList(3)), 2, true);
+        assertBestLeaderEquals(createFooBuilder(true).
+                
setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+            4, false);
+    }
+
+    @Test
+    public void testShouldTryElection() {
+        assertFalse(createFooBuilder(false).shouldTryElection());
+        
assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true).
+            shouldTryElection());
+        assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).
+            shouldTryElection());
+        assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
+            shouldTryElection());
+    }
+
+    private static void 
testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
+                                                                 
PartitionChangeRecord record,
+                                                                 int 
expectedLeader) {
+        builder.triggerLeaderEpochBumpIfNeeded(record);
+        assertEquals(expectedLeader, record.leader());
+    }
+
+    @Test
+    public void testTriggerLeaderEpochBumpIfNeeded() {
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false),
+            new PartitionChangeRecord(), NO_LEADER_CHANGE);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetIsr(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new 
PartitionChangeRecord(), 1);
+        testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false).
+            setTargetReplicas(Arrays.asList(2, 1, 3, 4)),
+            new PartitionChangeRecord().setLeader(2), 2);
+    }
+
+    @Test
+    public void testNoChange() {
+        assertEquals(Optional.empty(), createFooBuilder(false).build());
+        assertEquals(Optional.empty(), createFooBuilder(true).build());
+        assertEquals(Optional.empty(), createBarBuilder(false).build());
+        assertEquals(Optional.empty(), createBarBuilder(true).build());
+    }
+
+    @Test
+    public void testIsrChangeAndLeaderBump() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+            setTopicId(FOO_ID).
+            setPartitionId(0).
+            setIsr(Arrays.asList(2, 1)).
+            setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build());
+    }
+
+    @Test
+    public void testIsrChangeAndLeaderChange() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setIsr(Arrays.asList(2, 3)).
+                setLeader(2), 
PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build());
+    }
+
+    @Test
+    public void testReassignmentRearrangesReplicas() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(3, 2, 1)),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 
1)).build());
+    }
+
+    @Test
+    public void testIsrEnlargementCompletesReassignment() {
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(BAR_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(2, 3, 4)).
+                setIsr(Arrays.asList(2, 3, 4)).
+                setLeader(2).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 
4)).build());
+    }
+
+    @Test
+    public void testRevertReassignment() {
+        PartitionReassignmentRevert revert = new 
PartitionReassignmentRevert(BAR);
+        assertEquals(Arrays.asList(1, 2, 3), revert.replicas());
+        assertEquals(Arrays.asList(1, 2, 3), revert.isr());
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(BAR_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(1, 2, 3)).
+                setLeader(1).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()),
+                PARTITION_CHANGE_RECORD.highestSupportedVersion())),
+            createBarBuilder(false).
+                setTargetReplicas(revert.replicas()).
+                setTargetIsr(revert.isr()).
+                setTargetRemoving(Collections.emptyList()).
+                setTargetAdding(Collections.emptyList()).
+                build());
+    }
+
+    @Test
+    public void testQuickReassignment() {
+        PartitionReassignmentReplicas replicas = new 
PartitionReassignmentReplicas(
+            Replicas.toList(FOO.replicas), Arrays.asList(1, 2));
+        assertEquals(Collections.singletonList(3), replicas.removing());
+        assertEquals(Collections.emptyList(), replicas.adding());
+        assertEquals(Arrays.asList(1, 2, 3), replicas.merged());
+        assertEquals(Optional.of(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+                setTopicId(FOO_ID).
+                setPartitionId(0).
+                setReplicas(Arrays.asList(1, 2)).

Review comment:
       In the removal case, the replicas should include the removed replica, 
right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to