vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418304275



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) 
{
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final 
int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> 
i)).orElse(0);
+            final int minActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> 
i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = 
harness.clientStates.values().stream().map(s1 -> 1.0 * 
intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / 
s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);

Review comment:
       I can neither confirm nor deny any statements that I may or may not have 
made in the past ;)  Kidding aside, I don't recall what that convincing 
argument might have been. Perhaps just that that kind of multivariate 
optimization would be complicated to implement, and maybe it's better to just 
start simple.
   
   Whatever the intent might have been, though, the actual implementation of 
our balancer should already achieve a "perfect" balance of stateful-active 
tasks. I'm not sure about standbys, but I don't see why we shouldn't just make 
them balanced as well.
   
   Regarding stateless tasks, it does seem suboptimal to assign them to 
instances that already have active tasks when there are instances available 
with no tasks at all.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to