vvcephei commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r418304275
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java ########## @@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) { } } + private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) { + final int activeDiff; + final int activeStatefulDiff; + final double activeStatelessPerThreadDiff; + final int assignedDiff; + final int standbyDiff; + + { + final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0); + final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0); + activeDiff = maxActive - minActive; + } + { + final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0); + final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0); + activeStatefulDiff = maxActiveStateful - minActiveStateful; + } + { + final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0); Review comment: I can neither confirm nor deny any statements that I may or may not have made in the past ;) Kidding aside, I don't recall what that convincing argument might have been. Perhaps just that that kind of multivariate optimization would be complicated to implement, and maybe it's better to just start simple. Whatever the intent might have been, though, the actual implementation of our balancer should already achieve a "perfect" balance of stateful-active tasks. I'm not sure about standbys, but I don't see why we shouldn't just make them balanced as well. Regarding stateless tasks, it does seem suboptimal to assign them to instances that already have active tasks when there are instances available with no tasks at all. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org