junrao commented on a change in pull request #10753: URL: https://github.com/apache/kafka/pull/10753#discussion_r669155613
########## File path: metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java ########## @@ -88,9 +90,34 @@ public void testCopyWithout() { assertArrayEquals(new int[] {4, 1}, Replicas.copyWithout(new int[] {4, 2, 2, 1}, 2)); } + @Test + public void testCopyWithout2() { + assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {}, new int[] {})); + assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {1}, new int[] {1})); + assertArrayEquals(new int[] {1, 3}, + Replicas.copyWithout(new int[] {1, 2, 3}, new int[]{2, 4})); + assertArrayEquals(new int[] {4}, + Replicas.copyWithout(new int[] {4, 2, 2, 1}, new int[]{2, 1})); + } + @Test public void testCopyWith() { assertArrayEquals(new int[] {-1}, Replicas.copyWith(new int[] {}, -1)); assertArrayEquals(new int[] {1, 2, 3, 4}, Replicas.copyWith(new int[] {1, 2, 3}, 4)); } + + @Test + public void testToSet() { + assertEquals(Collections.emptySet(), Replicas.toSet(new int[] {})); + assertEquals(new HashSet<>(Arrays.asList(3, 1, 5)), Review comment: Could we add a test where the input has duplicates? ########## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ########## @@ -763,17 +747,227 @@ public void testValidateBadManualPartitionAssignments() throws Exception { OptionalInt.of(3))).getMessage()); } + private final static ListPartitionReassignmentsResponseData NONE_REASSIGNING = + new ListPartitionReassignmentsResponseData().setErrorMessage(null); + @Test - public void testBestLeader() { - assertEquals(2, ReplicationControlManager.bestLeader( - new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true)); - assertEquals(3, ReplicationControlManager.bestLeader( - new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true)); - assertEquals(4, ReplicationControlManager.bestLeader( - new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4)); - assertEquals(-1, ReplicationControlManager.bestLeader( - new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4)); - assertEquals(4, ReplicationControlManager.bestLeader( - new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4)); + public void testReassignPartitions() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3); + ctx.unfenceBrokers(0, 1, 2, 3); + Uuid fooId = ctx.createTestTopic("foo", new int[][] { + new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId(); + ctx.createTestTopic("bar", new int[][] { + new int[] {1, 2, 3}}).topicId(); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null)); + ControllerResult<AlterPartitionReassignmentsResponseData> alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( + new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(Arrays.asList(3, 2, 1)), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(Arrays.asList(0, 2, 1)), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(Arrays.asList(0, 2, 1)))), + new ReassignableTopic().setName("bar")))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(Arrays.asList( + new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). + setErrorMessage("Unable to find partition foo:2."))), + new ReassignableTopicResponse(). + setName("bar"))), + alterResult.response()); + ctx.replay(alterResult.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(Arrays.asList(new OngoingTopicReassignment(). + setName("foo").setPartitions(Arrays.asList( + new OngoingPartitionReassignment().setPartitionIndex(1). + setRemovingReplicas(Arrays.asList(3)). + setAddingReplicas(Arrays.asList(0)). + setReplicas(Arrays.asList(0, 2, 1, 3)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(null)); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList( + new ListPartitionReassignmentsTopics().setName("bar"). + setPartitionIndexes(Arrays.asList(0, 1, 2))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList( + new ListPartitionReassignmentsTopics().setName("foo"). + setPartitionIndexes(Arrays.asList(0, 1, 2))))); + ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( + new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(null), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(null), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(null))), + new ReassignableTopic().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(null)))))); + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new PartitionChangeRecord().setTopicId(fooId). + setPartitionId(1). + setReplicas(Arrays.asList(2, 1, 3)). + setLeader(3). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), (short) 0)), + new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList( + new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorCode(NONE.code()).setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). + setErrorMessage("Unable to find partition foo:2."))), + new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()). + setErrorMessage(null)))))), + cancelResult); + System.out.println("running final alterIsr..."); + ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr( + new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103). + setTopics(Arrays.asList(new TopicData().setName("foo").setPartitions(Arrays.asList( + new PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1). + setLeaderEpoch(1).setNewIsr(Arrays.asList(3, 0, 2, 1))))))); + assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList( + new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList( + new AlterIsrResponseData.PartitionData(). + setPartitionIndex(1). + setErrorCode(FENCED_LEADER_EPOCH.code()))))), + alterIsrResult.response()); + System.out.println("applying final alterIsr..."); + ctx.replay(alterIsrResult.records()); + System.out.println("performing final check..."); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null)); + } + + @Test + public void testCancelReassignPartitions() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3, 4); + ctx.unfenceBrokers(0, 1, 2, 3, 4); + Uuid fooId = ctx.createTestTopic("foo", new int[][] { + new int[] {1, 2, 3, 4}, new int[] {0, 1, 2, 3}, new int[] {4, 3, 1, 0}, + new int[] {2, 3, 4, 1}}).topicId(); + Uuid barId = ctx.createTestTopic("bar", new int[][] { + new int[] {4, 3, 2}}).topicId(); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null)); + List<ApiMessageAndVersion> fenceRecords = new ArrayList<>(); + replication.handleBrokerFenced(3, fenceRecords); + ctx.replay(fenceRecords); + assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4}, new int[] {1, 2, 4}, + new int[] {}, new int[] {}, 1, 1, 1), replication.getPartition(fooId, 0)); + ControllerResult<AlterPartitionReassignmentsResponseData> alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( + new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(Arrays.asList(1, 2, 3)), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(Arrays.asList(1, 2, 3, 0)), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(Arrays.asList(5, 6, 7)), + new ReassignablePartition().setPartitionIndex(3). + setReplicas(Arrays.asList()))), + new ReassignableTopic().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(Arrays.asList(1, 2, 3, 4, 0))))))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(Arrays.asList( + new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()). + setErrorMessage("The manual partition assignment includes broker 5, " + + "but no such broker is registered."), + new ReassignablePartitionResponse().setPartitionIndex(3). + setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()). + setErrorMessage("The manual partition assignment includes an empty " + + "replica list."))), + new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null))))), + alterResult.response()); + ctx.replay(alterResult.records()); + assertEquals(new PartitionRegistration(new int[] {1, 2, 3}, new int[] {1, 2}, + new int[] {}, new int[] {}, 1, 2, 2), replication.getPartition(fooId, 0)); + assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 0}, new int[] {0, 1, 2}, + new int[] {}, new int[] {}, 0, 1, 2), replication.getPartition(fooId, 1)); + assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4, 0}, new int[] {4, 2}, + new int[] {}, new int[] {0, 1}, 4, 2, 2), replication.getPartition(barId, 0)); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(Arrays.asList(new OngoingTopicReassignment(). + setName("bar").setPartitions(Arrays.asList( + new OngoingPartitionReassignment().setPartitionIndex(0). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Arrays.asList(0, 1)). + setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(null)); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList( + new ListPartitionReassignmentsTopics().setName("foo"). + setPartitionIndexes(Arrays.asList(0, 1, 2))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList( + new ListPartitionReassignmentsTopics().setName("bar"). + setPartitionIndexes(Arrays.asList(0, 1, 2))))); + ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( + new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(null))), + new ReassignableTopic().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(null)))))); + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new PartitionChangeRecord().setTopicId(barId). + setPartitionId(0). + setLeader(4). + setReplicas(Arrays.asList(2, 3, 4)). + setRemovingReplicas(null). + setAddingReplicas(Collections.emptyList()), (short) 0)), + new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList( + new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))), + new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null)))))), + cancelResult); + ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr( Review comment: Is the alterIsr meant to be tested before cancellation? If so, probably move the code up to make it clear. ########## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ########## @@ -422,4 +351,75 @@ class RaftClusterTest { listenerName = listenerName ) + @Test + def testCreateClusterAndPerformReassignment(): Unit = { + val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). + setNumBrokerNodes(4). + setNumControllerNodes(3).build()).build() + try { + cluster.format() + cluster.startup() + cluster.waitForReadyBrokers() + val admin = Admin.create(cluster.clientProperties()) + try { + // Create the topic. + val assignments = new util.HashMap[Integer, util.List[Integer]] + assignments.put(0, Arrays.asList(0, 1, 2)) + assignments.put(1, Arrays.asList(1, 2, 3)) + assignments.put(2, Arrays.asList(2, 3, 0)) + val createTopicResult = admin.createTopics(Collections.singletonList( + new NewTopic("foo", assignments))) + createTopicResult.all().get() + waitForTopicListing(admin, Seq("foo"), Seq()) + + // Start some reassignments. + assertEquals(Collections.emptyMap(), admin.listPartitionReassignments().reassignments().get()) + val reassignments = new util.HashMap[TopicPartition, Optional[NewPartitionReassignment]] + reassignments.put(new TopicPartition("foo", 0), + Optional.of(new NewPartitionReassignment(Arrays.asList(2, 1, 0)))) + reassignments.put(new TopicPartition("foo", 1), + Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1, 2)))) Review comment: Could we do an expansion test? ########## File path: metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java ########## @@ -763,17 +747,227 @@ public void testValidateBadManualPartitionAssignments() throws Exception { OptionalInt.of(3))).getMessage()); } + private final static ListPartitionReassignmentsResponseData NONE_REASSIGNING = + new ListPartitionReassignmentsResponseData().setErrorMessage(null); + @Test - public void testBestLeader() { - assertEquals(2, ReplicationControlManager.bestLeader( - new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true)); - assertEquals(3, ReplicationControlManager.bestLeader( - new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true)); - assertEquals(4, ReplicationControlManager.bestLeader( - new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4)); - assertEquals(-1, ReplicationControlManager.bestLeader( - new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4)); - assertEquals(4, ReplicationControlManager.bestLeader( - new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4)); + public void testReassignPartitions() throws Exception { + ReplicationControlTestContext ctx = new ReplicationControlTestContext(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3); + ctx.unfenceBrokers(0, 1, 2, 3); + Uuid fooId = ctx.createTestTopic("foo", new int[][] { + new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId(); + ctx.createTestTopic("bar", new int[][] { + new int[] {1, 2, 3}}).topicId(); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null)); + ControllerResult<AlterPartitionReassignmentsResponseData> alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( + new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(Arrays.asList(3, 2, 1)), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(Arrays.asList(0, 2, 1)), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(Arrays.asList(0, 2, 1)))), + new ReassignableTopic().setName("bar")))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(Arrays.asList( + new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). + setErrorMessage("Unable to find partition foo:2."))), + new ReassignableTopicResponse(). + setName("bar"))), + alterResult.response()); + ctx.replay(alterResult.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(Arrays.asList(new OngoingTopicReassignment(). + setName("foo").setPartitions(Arrays.asList( + new OngoingPartitionReassignment().setPartitionIndex(1). + setRemovingReplicas(Arrays.asList(3)). + setAddingReplicas(Arrays.asList(0)). + setReplicas(Arrays.asList(0, 2, 1, 3)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(null)); + assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList( + new ListPartitionReassignmentsTopics().setName("bar"). + setPartitionIndexes(Arrays.asList(0, 1, 2))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList( + new ListPartitionReassignmentsTopics().setName("foo"). + setPartitionIndexes(Arrays.asList(0, 1, 2))))); + ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList( + new ReassignableTopic().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(null), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(null), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(null))), + new ReassignableTopic().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(null)))))); + assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion( + new PartitionChangeRecord().setTopicId(fooId). + setPartitionId(1). + setReplicas(Arrays.asList(2, 1, 3)). + setLeader(3). + setRemovingReplicas(Collections.emptyList()). + setAddingReplicas(Collections.emptyList()), (short) 0)), + new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList( + new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorCode(NONE.code()).setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()). + setErrorMessage("Unable to find partition foo:2."))), + new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()). + setErrorMessage(null)))))), + cancelResult); + System.out.println("running final alterIsr..."); Review comment: Hmm, should we do logging instead of printing to System.out? Ditto below. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org