ableegoldman commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r418270317



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) 
{
         }
     }
 
+    private static void verifyBalancedAssignment(final Harness harness, final 
int balanceFactor) {
+        final int activeDiff;
+        final int activeStatefulDiff;
+        final double activeStatelessPerThreadDiff;
+        final int assignedDiff;
+        final int standbyDiff;
+
+        {
+            final int maxActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i
 -> i)).orElse(0);
+            final int minActive = 
harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i
 -> i)).orElse(0);
+            activeDiff = maxActive - minActive;
+        }
+        {
+            final int maxActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> 
i)).orElse(0);
+            final int minActiveStateful = 
harness.clientStates.values().stream().map(s -> diff(TreeSet::new, 
s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> 
i)).orElse(0);
+            activeStatefulDiff = maxActiveStateful - minActiveStateful;
+        }
+        {
+            final double maxActiveStatefulPerThread = 
harness.clientStates.values().stream().map(s1 -> 1.0 * 
intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / 
s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0);

Review comment:
       FWIW we'll probably want to revisit this if/when we migrate standby 
processing to a separate thread(s) but I'm +1 for enforcing this now. Just 
wondering what the current reasoning is.
   
   On the other hand, maybe it's better to enforce a good data parallelism over 
an equal per-task-type balance. Like if subtopology 1 is significantly heavier 
than subtopology 2, then it could be more balanced to have active 1_0 and 
active 2_0 on one instance and standby 1_0 and standby 2_0 on the other. But 
maybe that's not just worth optimizing 
    at this point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to