junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r668323818
########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; + +/** + * PartitionMutator handles changing partition registrations. + */ +public class PartitionChangeBuilder { + public static boolean changeRecordIsNoOp(PartitionChangeRecord record) { + if (record.isr() != null) return false; + if (record.leader() != NO_LEADER_CHANGE) return false; + if (record.replicas() != null) return false; + if (record.removingReplicas() != null) return false; + if (record.addingReplicas() != null) return false; + return true; + } + + private final PartitionRegistration partition; + private final Uuid topicId; + private final int partitionId; + private final Function<Integer, Boolean> isAcceptableLeader; + private final Supplier<Boolean> uncleanElectionOk; + private List<Integer> targetIsr; + private List<Integer> targetReplicas; + private List<Integer> targetRemoving; + private List<Integer> targetAdding; + private boolean alwaysElectPreferredIfPossible; + + public PartitionChangeBuilder(PartitionRegistration partition, + Uuid topicId, + int partitionId, + Function<Integer, Boolean> isAcceptableLeader, + Supplier<Boolean> uncleanElectionOk) { + this.partition = partition; + this.topicId = topicId; + this.partitionId = partitionId; + this.isAcceptableLeader = isAcceptableLeader; + this.uncleanElectionOk = uncleanElectionOk; + this.targetIsr = Replicas.toList(partition.isr); + this.targetReplicas = Replicas.toList(partition.replicas); + this.targetRemoving = Replicas.toList(partition.removingReplicas); + this.targetAdding = Replicas.toList(partition.addingReplicas); + this.alwaysElectPreferredIfPossible = false; + } + + public PartitionChangeBuilder setTargetIsr(List<Integer> targetIsr) { + this.targetIsr = targetIsr; + return this; + } + + public PartitionChangeBuilder setTargetReplicas(List<Integer> targetReplicas) { + this.targetReplicas = targetReplicas; + return this; + } + + public PartitionChangeBuilder setAlwaysElectPreferredIfPossible(boolean alwaysElectPreferredIfPossible) { Review comment: Do we need this since it's only used in tests? ########## File path: metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java ########## @@ -105,4 +105,42 @@ public void testToLeaderAndIsrPartitionState() { setIsNew(false).toString(), b.toLeaderAndIsrPartitionState(new TopicPartition("bar", 0), false).toString()); } + + @Test + public void testMergePartitionChangeRecordWithReassignmentData() { + PartitionRegistration partition0 = new PartitionRegistration(new int[] {1, 2, 3}, + new int[] {1, 2, 3}, Replicas.NONE, Replicas.NONE, 1, 100, 200); + PartitionRegistration partition1 = partition0.merge(new PartitionChangeRecord(). + setRemovingReplicas(Collections.singletonList(3)). + setAddingReplicas(Collections.singletonList(4)). + setReplicas(Arrays.asList(1, 2, 3, 4))); + assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4}, + new int[] {1, 2, 3}, new int[] {3}, new int[] {4}, 1, 100, 201), partition1); + PartitionRegistration partition2 = partition1.merge(new PartitionChangeRecord(). + setIsr(Arrays.asList(1, 2, 4)). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()). + setReplicas(Arrays.asList(1, 2, 4))); + assertEquals(new PartitionRegistration(new int[] {1, 2, 4}, + new int[] {1, 2, 4}, Replicas.NONE, Replicas.NONE, 1, 100, 202), partition2); + assertFalse(partition2.isReassigning()); + } + + @Test + public void testPartitionControlInfoIsrChangeCompletesReassignment() { + PartitionRegistration partition0 = new PartitionRegistration( + new int[]{1, 2, 3, 4}, new int[]{3}, new int[]{3}, new int[] {}, 1, 0, 0); Review comment: Could new int[] {} be Replicas.NONE? ########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionReassignmentRevert.java ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; + +import java.util.ArrayList; +import java.util.Set; +import java.util.List; +import java.util.Objects; + + +class PartitionReassignmentRevert { + private final List<Integer> replicas; + private final List<Integer> isr; + + PartitionReassignmentRevert(PartitionRegistration registration) { + // Figure out the replica list and ISR that we will have after reverting the + // reassignment. In general, we want to take out any replica that the reassignment + // was adding, but keep the ones the reassignment was removing. (But see the + // special case below.) + Set<Integer> adding = Replicas.toSet(registration.addingReplicas); + this.replicas = new ArrayList<>(registration.replicas.length); + this.isr = new ArrayList<>(registration.isr.length); + for (int i = 0; i < registration.isr.length; i++) { + int replica = registration.isr[i]; + if (!adding.contains(replica)) { + this.isr.add(replica); + } else if (i == registration.isr.length - 1 && isr.isEmpty()) { + // This is a special case where taking out all the "adding" replicas is + // not possible. The reason it is not possible is that doing so would + // create an empty ISR, which is not allowed. + // + // In this case, we leave in one of the adding replicas permanently. Review comment: This is different from how the old controller does. The old controller simply reverts the replicas to the original ones in KafkaController.maybeBuildReassignment(). We can change the behavior, but we probably need a KIP and do this consistently between the old and the new controller. ########## File path: metadata/src/main/java/org/apache/kafka/image/TopicDelta.java ########## @@ -103,7 +103,8 @@ public TopicImage apply() { for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) { if (entry.getValue().leader == brokerId) { PartitionRegistration prevPartition = image.partitions().get(entry.getKey()); - if (prevPartition == null || prevPartition.leader != brokerId) { + if (prevPartition == null || + prevPartition.leaderEpoch != entry.getValue().leaderEpoch) { Review comment: Should we check for partitionEpoch change too? Ditto in newLocalFollowers(). ########## File path: metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java ########## @@ -180,6 +187,44 @@ public LeaderAndIsrPartitionState toLeaderAndIsrPartitionState(TopicPartition tp setIsNew(isNew); } + /** + * Returns true if this partition is reassigning. + */ + public boolean isReassigning() { + return removingReplicas.length > 0 | addingReplicas.length > 0; + } + + /** + * Check if an ISR change completes this partition's reassignment. + * + * @param newIsr The new ISR. + * @return True if the reassignment is complete. + */ + public boolean isrChangeCompletesReassignment(int[] newIsr) { Review comment: It seems that this is only used in tests and is duplicating the logic in PartitionChangeBuidler.completeReassignmentIfNeeded(). Do we still need it? If so, could we move this to a test util? ########## File path: metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class PartitionChangeBuilderTest { + @Test + public void testChangeRecordIsNoOp() { + assertTrue(changeRecordIsNoOp(new PartitionChangeRecord())); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setLeader(1))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setIsr(Arrays.asList(1, 2, 3)))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setRemovingReplicas(Arrays.asList(1)))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setAddingReplicas(Arrays.asList(4)))); + } + + private final static PartitionRegistration FOO = new PartitionRegistration( + new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, + 1, 100, 200); + + private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); + + private static PartitionChangeBuilder createFooBuilder(boolean allowUnclean) { + return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> allowUnclean); + } + + private final static PartitionRegistration BAR = new PartitionRegistration( + new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] {4}, + 1, 100, 200); + + private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw"); + + private static PartitionChangeBuilder createBarBuilder(boolean allowUnclean) { + return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> allowUnclean); + } + + private static void assertBestLeaderEquals(PartitionChangeBuilder builder, + int expectedNode, + boolean expectedUnclean) { + BestLeader bestLeader = builder.new BestLeader(); + assertEquals(expectedNode, bestLeader.node); + assertEquals(expectedUnclean, bestLeader.unclean); + } + + @Test + public void testBestLeader() { + assertBestLeaderEquals(createFooBuilder(false), 2, false); + assertBestLeaderEquals(createFooBuilder(true), 2, false); + assertBestLeaderEquals(createFooBuilder(false). + setTargetIsr(Arrays.asList(1, 3)), 1, false); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(1, 3)), 1, false); + assertBestLeaderEquals(createFooBuilder(false). + setTargetIsr(Arrays.asList(3)), NO_LEADER, false); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(3)), 2, true); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)), + 4, false); + } + + @Test + public void testShouldTryElection() { + assertFalse(createFooBuilder(false).shouldTryElection()); + assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true). + shouldTryElection()); + assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)). + shouldTryElection()); + assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)). + shouldTryElection()); + } + + private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder, + PartitionChangeRecord record, + int expectedLeader) { + builder.triggerLeaderEpochBumpIfNeeded(record); + assertEquals(expectedLeader, record.leader()); + } + + @Test + public void testTriggerLeaderEpochBumpIfNeeded() { + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false), + new PartitionChangeRecord(), NO_LEADER_CHANGE); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetIsr(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetReplicas(Arrays.asList(2, 1, 3, 4)), + new PartitionChangeRecord().setLeader(2), 2); + } + + @Test + public void testNoChange() { + assertEquals(Optional.empty(), createFooBuilder(false).build()); + assertEquals(Optional.empty(), createFooBuilder(true).build()); + assertEquals(Optional.empty(), createBarBuilder(false).build()); + assertEquals(Optional.empty(), createBarBuilder(true).build()); + } + + @Test + public void testIsrChangeAndLeaderBump() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setIsr(Arrays.asList(2, 1)). + setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build()); + } + + @Test + public void testIsrChangeAndLeaderChange() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setIsr(Arrays.asList(2, 3)). + setLeader(2), PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build()); + } + + @Test + public void testReassignmentRearrangesReplicas() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setReplicas(Arrays.asList(3, 2, 1)), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 1)).build()); + } + + @Test + public void testIsrEnlargementCompletesReassignment() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(BAR_ID). + setPartitionId(0). + setReplicas(Arrays.asList(2, 3, 4)). + setIsr(Arrays.asList(2, 3, 4)). + setLeader(2). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 4)).build()); + } + + @Test + public void testRevertReassignment() { + PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR); + assertEquals(Arrays.asList(1, 2, 3), revert.replicas()); + assertEquals(Arrays.asList(1, 2, 3), revert.isr()); + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(BAR_ID). + setPartitionId(0). + setReplicas(Arrays.asList(1, 2, 3)). + setLeader(1). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createBarBuilder(false). + setTargetReplicas(revert.replicas()). + setTargetIsr(revert.isr()). + setTargetRemoving(Collections.emptyList()). + setTargetAdding(Collections.emptyList()). + build()); + } + + @Test + public void testQuickReassignment() { + PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( + Replicas.toList(FOO.replicas), Arrays.asList(1, 2)); + assertEquals(Collections.singletonList(3), replicas.removing()); + assertEquals(Collections.emptyList(), replicas.adding()); + assertEquals(Arrays.asList(1, 2, 3), replicas.merged()); + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setReplicas(Arrays.asList(1, 2)). + setIsr(Arrays.asList(2, 1)). + setLeader(1), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false). + setTargetReplicas(replicas.merged()). + setTargetRemoving(replicas.removing()). + build()); + } + + @Test + public void testStartLongReassignment() { Review comment: testStartLongReassignment =>testAddingReplicaReassignment ? ########## File path: metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java ########## @@ -180,6 +187,44 @@ public LeaderAndIsrPartitionState toLeaderAndIsrPartitionState(TopicPartition tp setIsNew(isNew); } + /** + * Returns true if this partition is reassigning. + */ + public boolean isReassigning() { + return removingReplicas.length > 0 | addingReplicas.length > 0; Review comment: Shoud | be || ? ########## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ########## @@ -558,51 +608,111 @@ BrokersToIsrs brokersToIsrs() { responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). setPartitionIndex(partitionData.partitionIndex()). setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code())); + log.info("Rejecting alterIsr request for unknown partition {}-{}.", + topic.name, partitionData.partitionIndex()); continue; } - if (request.brokerId() != partition.leader) { + if (partitionData.leaderEpoch() != partition.leaderEpoch) { responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). setPartitionIndex(partitionData.partitionIndex()). - setErrorCode(INVALID_REQUEST.code())); + setErrorCode(FENCED_LEADER_EPOCH.code())); + log.debug("Rejecting alterIsr request from node {} for {}-{} because " + + "the current leader epoch is {}, not {}.", request.brokerId(), + topic.name, partitionData.partitionIndex(), + partition.leaderEpoch, partitionData.leaderEpoch()); continue; } - if (partitionData.leaderEpoch() != partition.leaderEpoch) { + if (request.brokerId() != partition.leader) { responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). setPartitionIndex(partitionData.partitionIndex()). - setErrorCode(Errors.FENCED_LEADER_EPOCH.code())); + setErrorCode(INVALID_REQUEST.code())); + log.info("Rejecting alterIsr request from node {} for {}-{} because " + + "the current leader is {}.", request.brokerId(), topic.name, + partitionData.partitionIndex(), partition.leader); continue; } if (partitionData.currentIsrVersion() != partition.partitionEpoch) { responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). setPartitionIndex(partitionData.partitionIndex()). - setErrorCode(Errors.INVALID_UPDATE_VERSION.code())); + setErrorCode(INVALID_UPDATE_VERSION.code())); + log.info("Rejecting alterIsr request from node {} for {}-{} because " + + "the current partition epoch is {}, not {}.", request.brokerId(), + topic.name, partitionData.partitionIndex(), + partition.partitionEpoch, partitionData.currentIsrVersion()); continue; } int[] newIsr = Replicas.toArray(partitionData.newIsr()); if (!Replicas.validateIsr(partition.replicas, newIsr)) { responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). setPartitionIndex(partitionData.partitionIndex()). setErrorCode(INVALID_REQUEST.code())); + log.error("Rejecting alterIsr request from node {} for {}-{} because " + + "it specified an invalid ISR {}.", request.brokerId(), + topic.name, partitionData.partitionIndex(), + partitionData.newIsr()); continue; } if (!Replicas.contains(newIsr, partition.leader)) { - // An alterIsr request can't remove the current leader. + // An alterIsr request can't ask for the current leader to be removed. responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). setPartitionIndex(partitionData.partitionIndex()). setErrorCode(INVALID_REQUEST.code())); + log.error("Rejecting alterIsr request from node {} for {}-{} because " + + "it specified an invalid ISR {} that doesn't include itself.", + request.brokerId(), topic.name, partitionData.partitionIndex(), + partitionData.newIsr()); continue; } - records.add(new ApiMessageAndVersion(new PartitionChangeRecord(). - setPartitionId(partitionData.partitionIndex()). - setTopicId(topic.id). - setIsr(partitionData.newIsr()), PARTITION_CHANGE_RECORD.highestSupportedVersion())); + // At this point, we have decided to perform the ISR change. We use + // PartitionChangeBuilder to find out what its effect will be. + PartitionChangeBuilder builder = new PartitionChangeBuilder(partition, + topic.id, + partitionData.partitionIndex(), + r -> clusterControl.unfenced(r), + () -> configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())); + builder.setTargetIsr(partitionData.newIsr()); + Optional<ApiMessageAndVersion> record = builder.build(); + Errors result = Errors.NONE; + if (record.isPresent()) { + records.add(record.get()); + PartitionChangeRecord change = (PartitionChangeRecord) record.get().message(); + partition = partition.merge(change); + if (log.isDebugEnabled()) { + log.debug("Node {} has altered ISR for {}-{} to {}.", + request.brokerId(), topic.name, partitionData.partitionIndex(), + change.isr()); + } + if (change.leader() != request.brokerId() && + change.leader() != NO_LEADER_CHANGE) { + // Normally, an alterIsr request, which is made by the partition + // leader itself, is not allowed to modify the partition leader. + // However, if there is an ongoing partition reassignment and the + // ISR change completes it, then the leader may change as part of + // the changes made during reassignment cleanup. + // + // In this case, we report back FENCED_LEADER_EPOCH to the leader + // which made the alterIsr request. This lets it know that it must + // fetch new metadata before trying again. This return code is + // unusual because we both return an error and generate a new + // metadata record. We usually only do one or the other. + log.info("Returning FENCED_LEADER_EPOCH for alterIsr request " + + "from node {} for {}-{} because the new ISR completed " + + "the partition reassignment, and triggered a leadership " + + "change.", request.brokerId(), topic.name, + partitionData.partitionIndex()); + responseTopicData.partitions().add(new AlterIsrResponseData.PartitionData(). + setPartitionIndex(partitionData.partitionIndex()). + setErrorCode(FENCED_LEADER_EPOCH.code())); + continue; + } Review comment: It would be useful to add an info level logging when an AlterIsr triggers the completion of the partition reassignment. ########## File path: metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Supplier; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; + +/** + * PartitionMutator handles changing partition registrations. Review comment: The class is no longer called PartitionMutator. ########## File path: metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class PartitionChangeBuilderTest { + @Test + public void testChangeRecordIsNoOp() { + assertTrue(changeRecordIsNoOp(new PartitionChangeRecord())); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setLeader(1))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setIsr(Arrays.asList(1, 2, 3)))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setRemovingReplicas(Arrays.asList(1)))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setAddingReplicas(Arrays.asList(4)))); + } + + private final static PartitionRegistration FOO = new PartitionRegistration( + new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, + 1, 100, 200); + + private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); + + private static PartitionChangeBuilder createFooBuilder(boolean allowUnclean) { + return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> allowUnclean); + } + + private final static PartitionRegistration BAR = new PartitionRegistration( + new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] {4}, + 1, 100, 200); + + private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw"); + + private static PartitionChangeBuilder createBarBuilder(boolean allowUnclean) { + return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> allowUnclean); + } + + private static void assertBestLeaderEquals(PartitionChangeBuilder builder, + int expectedNode, + boolean expectedUnclean) { + BestLeader bestLeader = builder.new BestLeader(); + assertEquals(expectedNode, bestLeader.node); + assertEquals(expectedUnclean, bestLeader.unclean); + } + + @Test + public void testBestLeader() { + assertBestLeaderEquals(createFooBuilder(false), 2, false); + assertBestLeaderEquals(createFooBuilder(true), 2, false); + assertBestLeaderEquals(createFooBuilder(false). + setTargetIsr(Arrays.asList(1, 3)), 1, false); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(1, 3)), 1, false); + assertBestLeaderEquals(createFooBuilder(false). + setTargetIsr(Arrays.asList(3)), NO_LEADER, false); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(3)), 2, true); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)), + 4, false); + } + + @Test + public void testShouldTryElection() { + assertFalse(createFooBuilder(false).shouldTryElection()); + assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true). + shouldTryElection()); + assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)). + shouldTryElection()); + assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)). + shouldTryElection()); + } + + private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder, + PartitionChangeRecord record, + int expectedLeader) { + builder.triggerLeaderEpochBumpIfNeeded(record); + assertEquals(expectedLeader, record.leader()); + } + + @Test + public void testTriggerLeaderEpochBumpIfNeeded() { + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false), + new PartitionChangeRecord(), NO_LEADER_CHANGE); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetIsr(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetReplicas(Arrays.asList(2, 1, 3, 4)), + new PartitionChangeRecord().setLeader(2), 2); + } + + @Test + public void testNoChange() { + assertEquals(Optional.empty(), createFooBuilder(false).build()); + assertEquals(Optional.empty(), createFooBuilder(true).build()); + assertEquals(Optional.empty(), createBarBuilder(false).build()); + assertEquals(Optional.empty(), createBarBuilder(true).build()); + } + + @Test + public void testIsrChangeAndLeaderBump() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setIsr(Arrays.asList(2, 1)). + setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build()); + } + + @Test + public void testIsrChangeAndLeaderChange() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setIsr(Arrays.asList(2, 3)). + setLeader(2), PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build()); + } + + @Test + public void testReassignmentRearrangesReplicas() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setReplicas(Arrays.asList(3, 2, 1)), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 1)).build()); + } + + @Test + public void testIsrEnlargementCompletesReassignment() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(BAR_ID). + setPartitionId(0). + setReplicas(Arrays.asList(2, 3, 4)). + setIsr(Arrays.asList(2, 3, 4)). + setLeader(2). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 4)).build()); + } + + @Test + public void testRevertReassignment() { + PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR); + assertEquals(Arrays.asList(1, 2, 3), revert.replicas()); + assertEquals(Arrays.asList(1, 2, 3), revert.isr()); + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(BAR_ID). + setPartitionId(0). + setReplicas(Arrays.asList(1, 2, 3)). + setLeader(1). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createBarBuilder(false). + setTargetReplicas(revert.replicas()). + setTargetIsr(revert.isr()). + setTargetRemoving(Collections.emptyList()). + setTargetAdding(Collections.emptyList()). + build()); + } + + @Test + public void testQuickReassignment() { Review comment: testQuickReassignment =>testRemovingReplicaReassignment ? ########## File path: metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java ########## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionChangeRecord; +import org.apache.kafka.controller.PartitionChangeBuilder.BestLeader; +import org.apache.kafka.metadata.PartitionRegistration; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; + +import static org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD; +import static org.apache.kafka.controller.PartitionChangeBuilder.changeRecordIsNoOp; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; +import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class PartitionChangeBuilderTest { + @Test + public void testChangeRecordIsNoOp() { + assertTrue(changeRecordIsNoOp(new PartitionChangeRecord())); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord().setLeader(1))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setIsr(Arrays.asList(1, 2, 3)))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setRemovingReplicas(Arrays.asList(1)))); + assertFalse(changeRecordIsNoOp(new PartitionChangeRecord(). + setAddingReplicas(Arrays.asList(4)))); + } + + private final static PartitionRegistration FOO = new PartitionRegistration( + new int[] {2, 1, 3}, new int[] {2, 1, 3}, Replicas.NONE, Replicas.NONE, + 1, 100, 200); + + private final static Uuid FOO_ID = Uuid.fromString("FbrrdcfiR-KC2CPSTHaJrg"); + + private static PartitionChangeBuilder createFooBuilder(boolean allowUnclean) { + return new PartitionChangeBuilder(FOO, FOO_ID, 0, r -> r != 3, () -> allowUnclean); + } + + private final static PartitionRegistration BAR = new PartitionRegistration( + new int[] {1, 2, 3, 4}, new int[] {1, 2, 3}, new int[] {1}, new int[] {4}, + 1, 100, 200); + + private final static Uuid BAR_ID = Uuid.fromString("LKfUsCBnQKekvL9O5dY9nw"); + + private static PartitionChangeBuilder createBarBuilder(boolean allowUnclean) { + return new PartitionChangeBuilder(BAR, BAR_ID, 0, r -> r != 3, () -> allowUnclean); + } + + private static void assertBestLeaderEquals(PartitionChangeBuilder builder, + int expectedNode, + boolean expectedUnclean) { + BestLeader bestLeader = builder.new BestLeader(); + assertEquals(expectedNode, bestLeader.node); + assertEquals(expectedUnclean, bestLeader.unclean); + } + + @Test + public void testBestLeader() { + assertBestLeaderEquals(createFooBuilder(false), 2, false); + assertBestLeaderEquals(createFooBuilder(true), 2, false); + assertBestLeaderEquals(createFooBuilder(false). + setTargetIsr(Arrays.asList(1, 3)), 1, false); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(1, 3)), 1, false); + assertBestLeaderEquals(createFooBuilder(false). + setTargetIsr(Arrays.asList(3)), NO_LEADER, false); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(3)), 2, true); + assertBestLeaderEquals(createFooBuilder(true). + setTargetIsr(Arrays.asList(4)).setTargetReplicas(Arrays.asList(2, 1, 3, 4)), + 4, false); + } + + @Test + public void testShouldTryElection() { + assertFalse(createFooBuilder(false).shouldTryElection()); + assertTrue(createFooBuilder(false).setAlwaysElectPreferredIfPossible(true). + shouldTryElection()); + assertTrue(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)). + shouldTryElection()); + assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)). + shouldTryElection()); + } + + private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder, + PartitionChangeRecord record, + int expectedLeader) { + builder.triggerLeaderEpochBumpIfNeeded(record); + assertEquals(expectedLeader, record.leader()); + } + + @Test + public void testTriggerLeaderEpochBumpIfNeeded() { + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false), + new PartitionChangeRecord(), NO_LEADER_CHANGE); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetIsr(Arrays.asList(2, 1)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetIsr(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetReplicas(Arrays.asList(2, 1, 3, 4)), new PartitionChangeRecord(), 1); + testTriggerLeaderEpochBumpIfNeededLeader(createFooBuilder(false). + setTargetReplicas(Arrays.asList(2, 1, 3, 4)), + new PartitionChangeRecord().setLeader(2), 2); + } + + @Test + public void testNoChange() { + assertEquals(Optional.empty(), createFooBuilder(false).build()); + assertEquals(Optional.empty(), createFooBuilder(true).build()); + assertEquals(Optional.empty(), createBarBuilder(false).build()); + assertEquals(Optional.empty(), createBarBuilder(true).build()); + } + + @Test + public void testIsrChangeAndLeaderBump() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setIsr(Arrays.asList(2, 1)). + setLeader(1), PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).build()); + } + + @Test + public void testIsrChangeAndLeaderChange() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setIsr(Arrays.asList(2, 3)). + setLeader(2), PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetIsr(Arrays.asList(2, 3)).build()); + } + + @Test + public void testReassignmentRearrangesReplicas() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setReplicas(Arrays.asList(3, 2, 1)), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createFooBuilder(false).setTargetReplicas(Arrays.asList(3, 2, 1)).build()); + } + + @Test + public void testIsrEnlargementCompletesReassignment() { + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(BAR_ID). + setPartitionId(0). + setReplicas(Arrays.asList(2, 3, 4)). + setIsr(Arrays.asList(2, 3, 4)). + setLeader(2). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createBarBuilder(false).setTargetIsr(Arrays.asList(1, 2, 3, 4)).build()); + } + + @Test + public void testRevertReassignment() { + PartitionReassignmentRevert revert = new PartitionReassignmentRevert(BAR); + assertEquals(Arrays.asList(1, 2, 3), revert.replicas()); + assertEquals(Arrays.asList(1, 2, 3), revert.isr()); + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(BAR_ID). + setPartitionId(0). + setReplicas(Arrays.asList(1, 2, 3)). + setLeader(1). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), + PARTITION_CHANGE_RECORD.highestSupportedVersion())), + createBarBuilder(false). + setTargetReplicas(revert.replicas()). + setTargetIsr(revert.isr()). + setTargetRemoving(Collections.emptyList()). + setTargetAdding(Collections.emptyList()). + build()); + } + + @Test + public void testQuickReassignment() { + PartitionReassignmentReplicas replicas = new PartitionReassignmentReplicas( + Replicas.toList(FOO.replicas), Arrays.asList(1, 2)); + assertEquals(Collections.singletonList(3), replicas.removing()); + assertEquals(Collections.emptyList(), replicas.adding()); + assertEquals(Arrays.asList(1, 2, 3), replicas.merged()); + assertEquals(Optional.of(new ApiMessageAndVersion(new PartitionChangeRecord(). + setTopicId(FOO_ID). + setPartitionId(0). + setReplicas(Arrays.asList(1, 2)). Review comment: In the removal case, the replicas should include the removed replica, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org