junrao commented on a change in pull request #10753:
URL: https://github.com/apache/kafka/pull/10753#discussion_r669155613



##########
File path: metadata/src/test/java/org/apache/kafka/metadata/ReplicasTest.java
##########
@@ -88,9 +90,34 @@ public void testCopyWithout() {
         assertArrayEquals(new int[] {4, 1}, Replicas.copyWithout(new int[] {4, 
2, 2, 1}, 2));
     }
 
+    @Test
+    public void testCopyWithout2() {
+        assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {}, new 
int[] {}));
+        assertArrayEquals(new int[] {}, Replicas.copyWithout(new int[] {1}, 
new int[] {1}));
+        assertArrayEquals(new int[] {1, 3},
+            Replicas.copyWithout(new int[] {1, 2, 3}, new int[]{2, 4}));
+        assertArrayEquals(new int[] {4},
+            Replicas.copyWithout(new int[] {4, 2, 2, 1}, new int[]{2, 1}));
+    }
+
     @Test
     public void testCopyWith() {
         assertArrayEquals(new int[] {-1}, Replicas.copyWith(new int[] {}, -1));
         assertArrayEquals(new int[] {1, 2, 3, 4}, Replicas.copyWith(new int[] 
{1, 2, 3}, 4));
     }
+
+    @Test
+    public void testToSet() {
+        assertEquals(Collections.emptySet(), Replicas.toSet(new int[] {}));
+        assertEquals(new HashSet<>(Arrays.asList(3, 1, 5)),

Review comment:
       Could we add a test where the input has duplicates?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -763,17 +747,227 @@ public void testValidateBadManualPartitionAssignments() 
throws Exception {
                         OptionalInt.of(3))).getMessage());
     }
 
+    private final static ListPartitionReassignmentsResponseData 
NONE_REASSIGNING =
+        new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+
     @Test
-    public void testBestLeader() {
-        assertEquals(2, ReplicationControlManager.bestLeader(
-            new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
-        assertEquals(3, ReplicationControlManager.bestLeader(
-            new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
-        assertEquals(4, ReplicationControlManager.bestLeader(
-            new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
-        assertEquals(-1, ReplicationControlManager.bestLeader(
-            new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
-        assertEquals(4, ReplicationControlManager.bestLeader(
-            new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
+    public void testReassignPartitions() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
+        Uuid fooId = ctx.createTestTopic("foo", new int[][] {
+            new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
+        ctx.createTestTopic("bar", new int[][] {
+            new int[] {1, 2, 3}}).topicId();
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null));
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+            replication.alterPartitionReassignments(
+                new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+                    new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(Arrays.asList(3, 2, 1)),
+                        new ReassignablePartition().setPartitionIndex(1).
+                            setReplicas(Arrays.asList(0, 2, 1)),
+                        new ReassignablePartition().setPartitionIndex(2).
+                            setReplicas(Arrays.asList(0, 2, 1)))),
+                new ReassignableTopic().setName("bar"))));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                setErrorMessage(null).setResponses(Arrays.asList(
+                    new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                            setErrorMessage(null),
+                        new 
ReassignablePartitionResponse().setPartitionIndex(1).
+                            setErrorMessage(null),
+                        new 
ReassignablePartitionResponse().setPartitionIndex(2).
+                            setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+                            setErrorMessage("Unable to find partition 
foo:2."))),
+                    new ReassignableTopicResponse().
+                        setName("bar"))),
+            alterResult.response());
+        ctx.replay(alterResult.records());
+        ListPartitionReassignmentsResponseData currentReassigning =
+            new ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                setTopics(Arrays.asList(new OngoingTopicReassignment().
+                    setName("foo").setPartitions(Arrays.asList(
+                    new OngoingPartitionReassignment().setPartitionIndex(1).
+                        setRemovingReplicas(Arrays.asList(3)).
+                        setAddingReplicas(Arrays.asList(0)).
+                        setReplicas(Arrays.asList(0, 2, 1, 3))))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(null));
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(Arrays.asList(
+                new ListPartitionReassignmentsTopics().setName("bar").
+                    setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(Arrays.asList(
+            new ListPartitionReassignmentsTopics().setName("foo").
+                setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+        ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult 
=
+            replication.alterPartitionReassignments(
+                new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+                    new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(null),
+                        new ReassignablePartition().setPartitionIndex(1).
+                            setReplicas(null),
+                        new ReassignablePartition().setPartitionIndex(2).
+                            setReplicas(null))),
+                    new 
ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(null))))));
+        assertEquals(ControllerResult.atomicOf(Collections.singletonList(new 
ApiMessageAndVersion(
+            new PartitionChangeRecord().setTopicId(fooId).
+                setPartitionId(1).
+                setReplicas(Arrays.asList(2, 1, 3)).
+                setLeader(3).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()), (short) 0)),
+            new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
+                new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                    new ReassignablePartitionResponse().setPartitionIndex(0).
+                        
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
+                    new ReassignablePartitionResponse().setPartitionIndex(1).
+                        setErrorCode(NONE.code()).setErrorMessage(null),
+                    new ReassignablePartitionResponse().setPartitionIndex(2).
+                        setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+                        setErrorMessage("Unable to find partition foo:2."))),
+                new 
ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+                    new ReassignablePartitionResponse().setPartitionIndex(0).
+                        setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
+                        setErrorMessage(null)))))),
+            cancelResult);
+        System.out.println("running final alterIsr...");
+        ControllerResult<AlterIsrResponseData> alterIsrResult = 
replication.alterIsr(
+            new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103).
+                setTopics(Arrays.asList(new 
TopicData().setName("foo").setPartitions(Arrays.asList(
+                    new 
PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1).
+                        setLeaderEpoch(1).setNewIsr(Arrays.asList(3, 0, 2, 
1)))))));
+        assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
+            new 
AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+                new AlterIsrResponseData.PartitionData().
+                    setPartitionIndex(1).
+                    setErrorCode(FENCED_LEADER_EPOCH.code()))))),
+            alterIsrResult.response());
+        System.out.println("applying final alterIsr...");
+        ctx.replay(alterIsrResult.records());
+        System.out.println("performing final check...");
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null));
+    }
+
+    @Test
+    public void testCancelReassignPartitions() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(0, 1, 2, 3, 4);
+        Uuid fooId = ctx.createTestTopic("foo", new int[][] {
+            new int[] {1, 2, 3, 4}, new int[] {0, 1, 2, 3}, new int[] {4, 3, 
1, 0},
+            new int[] {2, 3, 4, 1}}).topicId();
+        Uuid barId = ctx.createTestTopic("bar", new int[][] {
+            new int[] {4, 3, 2}}).topicId();
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null));
+        List<ApiMessageAndVersion> fenceRecords = new ArrayList<>();
+        replication.handleBrokerFenced(3, fenceRecords);
+        ctx.replay(fenceRecords);
+        assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4}, new 
int[] {1, 2, 4},
+            new int[] {}, new int[] {}, 1, 1, 1), 
replication.getPartition(fooId, 0));
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+            replication.alterPartitionReassignments(
+                new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+                    new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(Arrays.asList(1, 2, 3)),
+                        new ReassignablePartition().setPartitionIndex(1).
+                            setReplicas(Arrays.asList(1, 2, 3, 0)),
+                        new ReassignablePartition().setPartitionIndex(2).
+                            setReplicas(Arrays.asList(5, 6, 7)),
+                        new ReassignablePartition().setPartitionIndex(3).
+                            setReplicas(Arrays.asList()))),
+                new 
ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                setErrorMessage(null).setResponses(Arrays.asList(
+            new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                new ReassignablePartitionResponse().setPartitionIndex(0).
+                    setErrorMessage(null),
+                new ReassignablePartitionResponse().setPartitionIndex(1).
+                    setErrorMessage(null),
+                new ReassignablePartitionResponse().setPartitionIndex(2).
+                    setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
+                    setErrorMessage("The manual partition assignment includes 
broker 5, " +
+                        "but no such broker is registered."),
+                new ReassignablePartitionResponse().setPartitionIndex(3).
+                    setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
+                    setErrorMessage("The manual partition assignment includes 
an empty " +
+                        "replica list."))),
+            new 
ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+                new ReassignablePartitionResponse().setPartitionIndex(0).
+                    setErrorMessage(null))))),
+            alterResult.response());
+        ctx.replay(alterResult.records());
+        assertEquals(new PartitionRegistration(new int[] {1, 2, 3}, new int[] 
{1, 2},
+            new int[] {}, new int[] {}, 1, 2, 2), 
replication.getPartition(fooId, 0));
+        assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 0}, new 
int[] {0, 1, 2},
+            new int[] {}, new int[] {}, 0, 1, 2), 
replication.getPartition(fooId, 1));
+        assertEquals(new PartitionRegistration(new int[] {1, 2, 3, 4, 0}, new 
int[] {4, 2},
+            new int[] {}, new int[] {0, 1}, 4, 2, 2), 
replication.getPartition(barId, 0));
+        ListPartitionReassignmentsResponseData currentReassigning =
+            new ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                setTopics(Arrays.asList(new OngoingTopicReassignment().
+                    setName("bar").setPartitions(Arrays.asList(
+                    new OngoingPartitionReassignment().setPartitionIndex(0).
+                        setRemovingReplicas(Collections.emptyList()).
+                        setAddingReplicas(Arrays.asList(0, 1)).
+                        setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(null));
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(Arrays.asList(
+            new ListPartitionReassignmentsTopics().setName("foo").
+                setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(Arrays.asList(
+            new ListPartitionReassignmentsTopics().setName("bar").
+                setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+        ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult 
=
+            replication.alterPartitionReassignments(
+                new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+                    new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(null))),
+                    new 
ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(null))))));
+        assertEquals(ControllerResult.atomicOf(Collections.singletonList(new 
ApiMessageAndVersion(
+                new PartitionChangeRecord().setTopicId(barId).
+                    setPartitionId(0).
+                    setLeader(4).
+                    setReplicas(Arrays.asList(2, 3, 4)).
+                    setRemovingReplicas(null).
+                    setAddingReplicas(Collections.emptyList()), (short) 0)),
+            new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
+                new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                    new ReassignablePartitionResponse().setPartitionIndex(0).
+                        
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
+                new 
ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+                    new ReassignablePartitionResponse().setPartitionIndex(0).
+                        setErrorMessage(null)))))),
+            cancelResult);
+        ControllerResult<AlterIsrResponseData> alterIsrResult = 
replication.alterIsr(

Review comment:
       Is the alterIsr meant to be tested before cancellation? If so, probably 
move the code up to make it clear.

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -422,4 +351,75 @@ class RaftClusterTest {
       listenerName = listenerName
     )
 
+  @Test
+  def testCreateClusterAndPerformReassignment(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(4).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        // Create the topic.
+        val assignments = new util.HashMap[Integer, util.List[Integer]]
+        assignments.put(0, Arrays.asList(0, 1, 2))
+        assignments.put(1, Arrays.asList(1, 2, 3))
+        assignments.put(2, Arrays.asList(2, 3, 0))
+        val createTopicResult = admin.createTopics(Collections.singletonList(
+          new NewTopic("foo", assignments)))
+        createTopicResult.all().get()
+        waitForTopicListing(admin, Seq("foo"), Seq())
+
+        // Start some reassignments.
+        assertEquals(Collections.emptyMap(), 
admin.listPartitionReassignments().reassignments().get())
+        val reassignments = new util.HashMap[TopicPartition, 
Optional[NewPartitionReassignment]]
+        reassignments.put(new TopicPartition("foo", 0),
+          Optional.of(new NewPartitionReassignment(Arrays.asList(2, 1, 0))))
+        reassignments.put(new TopicPartition("foo", 1),
+          Optional.of(new NewPartitionReassignment(Arrays.asList(0, 1, 2))))

Review comment:
       Could we do an expansion test?

##########
File path: 
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
##########
@@ -763,17 +747,227 @@ public void testValidateBadManualPartitionAssignments() 
throws Exception {
                         OptionalInt.of(3))).getMessage());
     }
 
+    private final static ListPartitionReassignmentsResponseData 
NONE_REASSIGNING =
+        new ListPartitionReassignmentsResponseData().setErrorMessage(null);
+
     @Test
-    public void testBestLeader() {
-        assertEquals(2, ReplicationControlManager.bestLeader(
-            new int[]{1, 2, 3, 4}, new int[]{4, 2, 3}, false, __ -> true));
-        assertEquals(3, ReplicationControlManager.bestLeader(
-            new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, __ -> true));
-        assertEquals(4, ReplicationControlManager.bestLeader(
-            new int[]{3, 2, 1, 4}, new int[]{4, 2, 3}, false, r -> r == 4));
-        assertEquals(-1, ReplicationControlManager.bestLeader(
-            new int[]{3, 4, 5}, new int[]{1, 2}, false, r -> r == 4));
-        assertEquals(4, ReplicationControlManager.bestLeader(
-            new int[]{3, 4, 5}, new int[]{1, 2}, true, r -> r == 4));
+    public void testReassignPartitions() throws Exception {
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
+        Uuid fooId = ctx.createTestTopic("foo", new int[][] {
+            new int[] {1, 2, 3}, new int[] {3, 2, 1}}).topicId();
+        ctx.createTestTopic("bar", new int[][] {
+            new int[] {1, 2, 3}}).topicId();
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null));
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+            replication.alterPartitionReassignments(
+                new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+                    new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(Arrays.asList(3, 2, 1)),
+                        new ReassignablePartition().setPartitionIndex(1).
+                            setReplicas(Arrays.asList(0, 2, 1)),
+                        new ReassignablePartition().setPartitionIndex(2).
+                            setReplicas(Arrays.asList(0, 2, 1)))),
+                new ReassignableTopic().setName("bar"))));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                setErrorMessage(null).setResponses(Arrays.asList(
+                    new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                            setErrorMessage(null),
+                        new 
ReassignablePartitionResponse().setPartitionIndex(1).
+                            setErrorMessage(null),
+                        new 
ReassignablePartitionResponse().setPartitionIndex(2).
+                            setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+                            setErrorMessage("Unable to find partition 
foo:2."))),
+                    new ReassignableTopicResponse().
+                        setName("bar"))),
+            alterResult.response());
+        ctx.replay(alterResult.records());
+        ListPartitionReassignmentsResponseData currentReassigning =
+            new ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                setTopics(Arrays.asList(new OngoingTopicReassignment().
+                    setName("foo").setPartitions(Arrays.asList(
+                    new OngoingPartitionReassignment().setPartitionIndex(1).
+                        setRemovingReplicas(Arrays.asList(3)).
+                        setAddingReplicas(Arrays.asList(0)).
+                        setReplicas(Arrays.asList(0, 2, 1, 3))))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(null));
+        assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(Arrays.asList(
+                new ListPartitionReassignmentsTopics().setName("bar").
+                    setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(Arrays.asList(
+            new ListPartitionReassignmentsTopics().setName("foo").
+                setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+        ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult 
=
+            replication.alterPartitionReassignments(
+                new 
AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
+                    new 
ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(null),
+                        new ReassignablePartition().setPartitionIndex(1).
+                            setReplicas(null),
+                        new ReassignablePartition().setPartitionIndex(2).
+                            setReplicas(null))),
+                    new 
ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+                        new ReassignablePartition().setPartitionIndex(0).
+                            setReplicas(null))))));
+        assertEquals(ControllerResult.atomicOf(Collections.singletonList(new 
ApiMessageAndVersion(
+            new PartitionChangeRecord().setTopicId(fooId).
+                setPartitionId(1).
+                setReplicas(Arrays.asList(2, 1, 3)).
+                setLeader(3).
+                setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()), (short) 0)),
+            new 
AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
+                new 
ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+                    new ReassignablePartitionResponse().setPartitionIndex(0).
+                        
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
+                    new ReassignablePartitionResponse().setPartitionIndex(1).
+                        setErrorCode(NONE.code()).setErrorMessage(null),
+                    new ReassignablePartitionResponse().setPartitionIndex(2).
+                        setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
+                        setErrorMessage("Unable to find partition foo:2."))),
+                new 
ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+                    new ReassignablePartitionResponse().setPartitionIndex(0).
+                        setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
+                        setErrorMessage(null)))))),
+            cancelResult);
+        System.out.println("running final alterIsr...");

Review comment:
       Hmm, should we do logging instead of printing to System.out? Ditto below.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to