C0urante commented on a change in pull request #10367:
URL: https://github.com/apache/kafka/pull/10367#discussion_r803045306



##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -532,23 +531,23 @@ private void resetDelay() {
      * each existing worker. The revoked tasks, once assigned to the new 
workers will maintain
      * a balanced load among the group.
      *
-     * @param activeAssignments
-     * @param completeWorkerAssignment
-     * @return
+     * @param allConnectorsAndTasks                          all the 
connectors and tasks we need to distribute
+     * @param completeWorkerAssignmentWithoutDuplication     current workers 
assignment without duplication

Review comment:
       I think the existing variable name is fine; we can definitely update the 
Javadoc to clarify that this assignment should exclude duplicated and 
to-be-deleted C/T, but something this long is a little hard to read.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##########
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
         verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
+    @Test
+    public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        //
+        // note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+        // assignment after this phase:
+        // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        //
+        // assignment after this phase:
+        // W1: assignedConnectors:[], assignedTasks:[],
+        //     revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+        // W2: assignedConnectors:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+        // W2: connectors:[], tasks:[]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Third assignment after revocations, and a third worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-3]
+        // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3");
+
+        // Forth assignment after revocations, and a forth worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-2]
+        // W2: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W4: assignedTasks:[], assignedTasks:[T0-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        // W4: connectors:[], tasks:[T0-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker4", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 1, 0, 1, "worker1", "worker2", "worker3", 
"worker4");
+
+        // Fifth assignment after revocations
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[]
+        // W2: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W4: assignedTasks:[], assignedTasks:[T0-2]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        // W4: connectors:[], tasks:[T0-3, T0-2]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 1, 0, 0, "worker1", "worker2", "worker3", 
"worker4");

Review comment:
       By this point, we should be completely balanced, but there's no explicit 
testing logic to verify that. What do you think about adding a utility method 
like `assertBalancedAssignments` and then invoking it here (and possibly other 
places in this test suite)?
   
   ```java
       private void assertBalancedAssignments(Map<String, ExtendedWorkerState> 
existingAssignments) {
           List<Integer> connectorCounts = existingAssignments.values().stream()
                   .map(e -> e.assignment().connectors().size())
                   .sorted()
                   .collect(Collectors.toList());
           List<Integer> taskCounts = existingAssignments.values().stream()
                   .map(e -> e.assignment().tasks().size())
                   .sorted()
                   .collect(Collectors.toList());
   
           int minConnectors = connectorCounts.get(0);
           int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);
   
           int minTasks = taskCounts.get(0);
           int maxTasks = taskCounts.get(taskCounts.size() - 1);
   
           assertTrue(
                   "Assignments are imbalanced. The spread of connectors across 
each worker is: " + connectorCounts,
                   maxConnectors - minConnectors <= 1
           );
           assertTrue(
                   "Assignments are imbalanced. The spread of tasks across each 
worker is: " + taskCounts,
                   maxTasks - minTasks <= 1
           );
       }
   ```
   
   It might also make these tests easier to write and modify (if we need to 
tweak rebalancing logic again in the future) if we used this type of method 
instead of the existing `assertAssignment` one, since in many cases all that 
really matters is that we achieve a balanced allocation after a specific series 
of rebalances, instead of exactly how many C/T were assigned/revoked in the 
interim.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##########
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
         verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
+    @Test
+    public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        //
+        // note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+        // assignment after this phase:
+        // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        //
+        // assignment after this phase:
+        // W1: assignedConnectors:[], assignedTasks:[],
+        //     revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+        // W2: assignedConnectors:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+        // W2: connectors:[], tasks:[]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Third assignment after revocations, and a third worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-3]
+        // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3");
+
+        // Forth assignment after revocations, and a forth worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-2]
+        // W2: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W4: assignedTasks:[], assignedTasks:[T0-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        // W4: connectors:[], tasks:[T0-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker4", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 1, 0, 1, "worker1", "worker2", "worker3", 
"worker4");
+
+        // Fifth assignment after revocations
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[]
+        // W2: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W4: assignedTasks:[], assignedTasks:[T0-2]
+        //     revokedConnectors:[] revokedTasks:[]

Review comment:
       ```suggestion
           // W1: assignedConnectors:[], assignedTasks:[],
           //     revokedConnectors:[], revokedTasks:[]
           // W2: assignedConnectors:[], assignedTasks:[]
           //     revokedConnectors:[] revokedTasks:[]
           // W3: assignedConnectors:[], assignedTasks:[]
           //     revokedConnectors:[] revokedTasks:[]
           // W4: assignedConnectors:[], assignedTasks:[T0-2]
           //     revokedConnectors:[] revokedTasks:[]
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##########
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
         verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
+    @Test
+    public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        //
+        // note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+        // assignment after this phase:
+        // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        //
+        // assignment after this phase:
+        // W1: assignedConnectors:[], assignedTasks:[],
+        //     revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+        // W2: assignedConnectors:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+        // W2: connectors:[], tasks:[]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Third assignment after revocations, and a third worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-3]
+        // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+        //     revokedConnectors:[] revokedTasks:[]

Review comment:
       ```suggestion
           // W1: assignedConnectors:[], assignedTasks:[],
           //     revokedConnectors:[], revokedTasks:[T0-3]
           // W2: assignedConnectors:[C1], assignedTasks:[T1-0, T1-1]
           //     revokedConnectors:[] revokedTasks:[]
           // W3: assignedConnectors:[], assignedTasks:[T1-2, T1-3]
           //     revokedConnectors:[] revokedTasks:[]
   ```

##########
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -258,17 +241,12 @@ private Long ensureLeaderConfig(long maxOffset, 
WorkerCoordinator coordinator) {
         handleLostAssignments(lostAssignments, newSubmissions, 
completeWorkerAssignment, memberConfigs);
 
         // Do not revoke resources for re-assignment while a delayed rebalance 
is active
-        // Also we do not revoke in two consecutive rebalances by the same 
leader
-        canRevoke = delay == 0 && canRevoke;
-
-        // Compute the connectors-and-tasks to be revoked for load balancing 
without taking into
-        // account the deleted ones.
-        log.debug("Can leader revoke tasks in this assignment? {} (delay: 
{})", canRevoke, delay);
-        if (canRevoke) {
+        log.debug("Can leader revoke tasks in this assignment? (delay: {})", 
delay);
+        if (delay == 0) {
+            // Compute the connectors-and-tasks to be revoked for load 
balancing without taking into
+            // account the deleted ones.
             Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
-                    performTaskRevocation(activeAssignments, 
currentWorkerAssignment);
-
-            log.debug("Connector and task to revoke assignments: {}", 
toRevoke);
+                    performTaskRevocation(configured, 
currentWorkerAssignmentWithoutDuplication);

Review comment:
       Can you clarify why this change is necessary? I ran the new 
`testTaskAssignmentWhenWorkerJoinAfterRevocation` test case with and without 
it, and although it fails without this change, it looks like that's more due to 
frail testing logic with the `assertAssignment` method than an actual bug in 
the rebalancing logic here. If I remove the `assertAssignment` calls but 
manually check on the distribution of C/T across the cluster after the fifth 
rebalance, everything is balanced.
   
   I've also produced a test case that fails with this change but succeeds 
without it:
   ```java
       @Test
       public void testNewWorkerAndNewTasksInSameRound() {
           
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
   
           // Start with 40 tasks
           configState = clusterConfigState(offset, 1, 40);
           when(coordinator.configSnapshot()).thenReturn(configState);
   
           // Start with three workers
           memberConfigs = memberConfigs(leader, offset, 0, 2);
           expectGeneration();
           assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
           ++rebalanceNum;
           returnedAssignments = assignmentsCapture.getValue();
           assertDelay(0, returnedAssignments);
           expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
           assertNoReassignments(memberConfigs, expectedMemberConfigs);
   
           applyAssignments(returnedAssignments);
           memberConfigs = memberConfigs(leader, offset, assignments);
           // Add 2 tasks
           configState = clusterConfigState(offset, 1, 42);
           when(coordinator.configSnapshot()).thenReturn(configState);
           // Add a worker
           memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
           expectGeneration();
           assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
           ++rebalanceNum;
           returnedAssignments = assignmentsCapture.getValue();
           assertDelay(0, returnedAssignments);
           expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
           assertNoReassignments(memberConfigs, expectedMemberConfigs);
   
           applyAssignments(returnedAssignments);
           memberConfigs = memberConfigs(leader, offset, assignments);
           // Rebalance once more as a follow-up to task revocation
           expectGeneration();
           assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
           ++rebalanceNum;
           returnedAssignments = assignmentsCapture.getValue();
           assertDelay(0, returnedAssignments);
           expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
           assertNoReassignments(memberConfigs, expectedMemberConfigs);
   
           applyAssignments(returnedAssignments);
           memberConfigs = memberConfigs(leader, offset, assignments);
           assertBalancedAssignments(memberConfigs);
   
           verify(coordinator, times(rebalanceNum)).configSnapshot();
           verify(coordinator, times(rebalanceNum)).leaderState(any());
           verify(coordinator, times(2 * rebalanceNum)).generationId();
           verify(coordinator, times(rebalanceNum)).memberId();
           verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
       }
   
       private void assertBalancedAssignments(Map<String, ExtendedWorkerState> 
existingAssignments) {
           List<Integer> connectorCounts = existingAssignments.values().stream()
                   .map(e -> e.assignment().connectors().size())
                   .sorted()
                   .collect(Collectors.toList());
           List<Integer> taskCounts = existingAssignments.values().stream()
                   .map(e -> e.assignment().tasks().size())
                   .sorted()
                   .collect(Collectors.toList());
   
           int minConnectors = connectorCounts.get(0);
           int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);
   
           int minTasks = taskCounts.get(0);
           int maxTasks = taskCounts.get(taskCounts.size() - 1);
   
           assertTrue(
                   "Assignments are imbalanced. The spread of connectors across 
each worker is: " + connectorCounts,
                   maxConnectors - minConnectors <= 1
           );
           assertTrue(
                   "Assignments are imbalanced. The spread of tasks across each 
worker is: " + taskCounts,
                   maxTasks - minTasks <= 1
           );
       }
   ```

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##########
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
         verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
+    @Test
+    public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        //
+        // note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+        // assignment after this phase:
+        // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        //
+        // assignment after this phase:
+        // W1: assignedConnectors:[], assignedTasks:[],
+        //     revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+        // W2: assignedConnectors:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+        // W2: connectors:[], tasks:[]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Third assignment after revocations, and a third worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-3]
+        // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3");

Review comment:
       This assertion style is useful for straightforward test cases but I 
wonder if we might want something more granular that allows us to assert how 
many C/T were assigned/revoked from individual workers (instead of across the 
entire cluster) for cases like this? Or, if that's difficult because of 
non-deterministic behavior caused by things like Java collections with 
undefined iteration order, could we at least have something that asserts how 
many workers should have a given total count of C/T in the cluster (e.g., 
"assert that 3 workers have 2 connectors assigned to them and 4 tasks, and that 
1 worker has 1 connector assigned to it and 3 tasks") or how many workers were 
assigned/revoked a given number of C/T during the rebalance (e.g., "assert that 
2 workers were assigned 2 tasks and revoked 3 tasks, and that 1 worker was 
assigned 0 tasks and revoked 4 tasks")?
   
   The comments are useful for illustrating what the expectations are on that 
front, but they aren't testable and so there's no guarantee that they're 
actually correct. And in fact, after running this through a debugger, I was 
seeing the correct number of C/T being allocated/revoked during each round, but 
the actual C/T names (i.e., `T0-0` vs `T1-1`) were different from what's 
described in the comments.

##########
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
##########
@@ -180,6 +208,155 @@ public void testTaskAssignmentWhenWorkerJoins() {
         verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
+    @Test
+    public void testTaskAssignmentWhenWorkerJoinAfterRevocation() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        
doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not 
yet assigned
+        //
+        // note: the assigned/revoked Connectors/tasks might be different, but 
the amount should be the same
+        // assignment after this phase:
+        // W1: assignedConnectors:[C0, C1], assignedTasks:[T0-0, T0-1, T0-2, 
T0-3, T1-0, T1-1, T1-2, T1-3],
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0, C1], tasks:[T0-0, T0-1, T0-2, T0-3, T1-0, T1-1, 
T1-2, T1-3]
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker joining and all connectors 
running on previous worker
+        //
+        // assignment after this phase:
+        // W1: assignedConnectors:[], assignedTasks:[],
+        //     revokedConnectors:[C1], revokedTasks:[T1-0, T1-1, T1-2, T1-3]
+        // W2: assignedConnectors:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2, T0-3]
+        // W2: connectors:[], tasks:[]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Third assignment after revocations, and a third worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-3]
+        // W2: assignedTasks:[C1], assignedTasks:[T1-0, T1-1]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[T1-2, T1-3]
+        //     revokedConnectors:[] revokedTasks:[]
+        //
+        // Final distribution after this phase:
+        // W1: connectors:[C0], tasks:[T0-0, T0-1, T0-2]
+        // W2: connectors:[C1], tasks:[T1-0, T1-1]
+        // W3: connectors:[], tasks:[T1-2, T1-3]
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, 
offset, null));
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, 
coordinator, protocolVersion);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, 
returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(1, 4, 0, 1, "worker1", "worker2", "worker3");
+
+        // Forth assignment after revocations, and a forth worker joining
+        //
+        // assignment after this phase:
+        // W1: assignedTasks:[], assignedTasks:[],
+        //     revokedConnectors:[], revokedTasks:[T0-2]
+        // W2: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W3: assignedTasks:[], assignedTasks:[]
+        //     revokedConnectors:[] revokedTasks:[]
+        // W4: assignedTasks:[], assignedTasks:[T0-3]
+        //     revokedConnectors:[] revokedTasks:[]

Review comment:
       ```suggestion
           // W1: assignedConnectors:[], assignedTasks:[],
           //     revokedConnectors:[], revokedTasks:[T0-2]
           // W2: assignedConnectors:[], assignedTasks:[]
           //     revokedConnectors:[] revokedTasks:[]
           // W3: assignedConnectors:[], assignedTasks:[]
           //     revokedConnectors:[] revokedTasks:[]
           // W4: assignedConnectors:[], assignedTasks:[T0-3]
           //     revokedConnectors:[] revokedTasks:[]
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to