gharris1727 commented on code in PR #16486: URL: https://github.com/apache/kafka/pull/16486#discussion_r1672817593
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java: ########## @@ -793,6 +796,46 @@ protected void assignConnectors(List<WorkerLoad> workerAssignment, Collection<St } } + static class BalancedIterator<E> implements Iterator<E> { + + private final Map<String, Iterator<E>> grouped; + private final List<String> keys; + + private int k; + + public BalancedIterator(Collection<E> collection, Function<E, String> allocationGrouper) { + this.k = 0; + this.grouped = collection.stream().collect(Collectors.groupingBy( + allocationGrouper, + Collectors.collectingAndThen( + Collectors.toList(), + List::iterator + ) + )); + this.keys = collection.stream() + .map(allocationGrouper) + .distinct() + .collect(Collectors.toList()); Review Comment: Maybe I don't understand, but I don't think this changed anything. The incoming Collection may still have an over-representation of a single connector first, leading that connector to be preferentially revoked. For example, consider this situation ``` W1: C1 C2 C3 C4 W2: C1 C5 C6 C7 W3: C1 C8 C9 C10 ``` If a new worker joins, C1 could be revoked because it appears the same number of times as all of the other connectors, but that would violate local balance later: ``` W1: C2 C3 C4 W2: C5 C6 C7 W3: C8 C9 C10 W4: C1 C1 C1 ``` The BalancedIterator isn't fairly tie-breaking when two connectors have the same number of jobs assigned to the current worker. Picking a single job to revoke depends on the entire rest of the state, and some degree of predicting how the jobs will be distributed afterwards. This is what I think the "ideal" state should be after that initial state: ``` W1: C1 C2 C3 W2: C1 C5 C6 W3: C1 C8 C9 W4: C4 C7 C10 ``` At most one C1 should be revoked overall, because revoking two to put on W4 would break local balance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org