gharris1727 commented on code in PR #16486:
URL: https://github.com/apache/kafka/pull/16486#discussion_r1672817593


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java:
##########
@@ -793,6 +796,46 @@ protected void assignConnectors(List<WorkerLoad> 
workerAssignment, Collection<St
         }
     }
 
+    static class BalancedIterator<E> implements Iterator<E> {
+
+        private final Map<String, Iterator<E>> grouped;
+        private final List<String> keys;
+
+        private int k;
+
+        public BalancedIterator(Collection<E> collection, Function<E, String> 
allocationGrouper) {
+            this.k = 0;
+            this.grouped = collection.stream().collect(Collectors.groupingBy(
+                    allocationGrouper,
+                    Collectors.collectingAndThen(
+                            Collectors.toList(),
+                            List::iterator
+                    )
+            ));
+            this.keys = collection.stream()
+                .map(allocationGrouper)
+                .distinct()
+                .collect(Collectors.toList());

Review Comment:
   Maybe I don't understand, but I don't think this changed anything. The 
incoming Collection may still have an over-representation of a single connector 
first, leading that connector to be preferentially revoked.
   
   For example, consider this situation
   
   ```
   W1: C1 C2 C3 C4
   W2: C1 C5 C6 C7
   W3: C1 C8 C9 C10
   ```
   
   If a new worker joins, C1 could be revoked because it appears the same 
number of times as all of the other connectors, but that would violate local 
balance later:
   
   ```
   W1: C2 C3 C4
   W2: C5 C6 C7
   W3: C8 C9 C10
   W4: C1 C1 C1
   ```
   
   The BalancedIterator isn't fairly tie-breaking when two connectors have the 
same number of jobs assigned to the current worker. Picking a single job to 
revoke depends on the entire rest of the state, and some degree of predicting 
how the jobs will be distributed afterwards.
   
   This is what I think the "ideal" state should be after that initial state:
   ```
   W1: C1 C2 C3
   W2: C1 C5 C6
   W3: C1 C8 C9
   W4: C4 C7 C10
   ```
   At most one C1 should be revoked overall, because revoking two to put on W4 
would break local balance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to