[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment
ableegoldman commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r418346499 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java ## @@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) { } } +private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) { +final int activeDiff; +final int activeStatefulDiff; +final double activeStatelessPerThreadDiff; +final int assignedDiff; +final int standbyDiff; + +{ +final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0); +final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0); +activeDiff = maxActive - minActive; +} +{ +final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0); +final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0); +activeStatefulDiff = maxActiveStateful - minActiveStateful; +} +{ +final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0); +final double minActiveStatefulPerThread = harness.clientStates.values().stream().map(s -> 1.0 * intersection(TreeSet::new, s.activeTasks(), harness.statelessTasks).size() / s.capacity()).min(comparingDouble(i -> i)).orElse(0.0); +activeStatelessPerThreadDiff = maxActiveStatefulPerThread - minActiveStatefulPerThread; +} +{ +final int maxAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).max(comparingInt(i -> i)).orElse(0); +final int minAssigned = harness.clientStates.values().stream().map(ClientState::assignedTaskCount).min(comparingInt(i -> i)).orElse(0); +assignedDiff = maxAssigned - minAssigned; +} +{ +final int maxStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).max(comparingInt(i -> i)).orElse(0); +final int minStandby = harness.clientStates.values().stream().map(ClientState::standbyTaskCount).min(comparingInt(i -> i)).orElse(0); +standbyDiff = maxStandby - minStandby; +} + +final Map results = new TreeMap<>(mkMap( +mkEntry("activeDiff", activeDiff), +mkEntry("activeStatefulDiff", activeStatefulDiff), +mkEntry("activeStatelessPerThreadDiff", activeStatelessPerThreadDiff), Review comment: Ok, this gives me an idea of where you're coming from w.r.t client-level balance. I was thinking that we should scale the entire task load with the thread capacity, but that only makes sense when considering some resources. Mainly (or only?) cpu, which I suppose it unlikely to be the bottleneck or resource constraint in a stateful application. Of course, it would still be for stateless tasks. So I guess I do see that we might want to balance stateless tasks at a thread level, and anything stateful at the client-level where IO is more likely to be the constraint. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment
ableegoldman commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r418270317 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java ## @@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) { } } +private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) { +final int activeDiff; +final int activeStatefulDiff; +final double activeStatelessPerThreadDiff; +final int assignedDiff; +final int standbyDiff; + +{ +final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0); +final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0); +activeDiff = maxActive - minActive; +} +{ +final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0); +final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0); +activeStatefulDiff = maxActiveStateful - minActiveStateful; +} +{ +final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0); Review comment: FWIW we'll probably want to revisit this if/when we migrate standby processing to a separate thread(s) but I'm +1 for enforcing this now. Just wondering what the current reasoning is. On the other hand, maybe it's better to enforce a good data parallelism over an equal per-task-type balance. Like if subtopology 1 is significantly heavier than subtopology 2, then it could be more balanced to have active 1_0 and active 2_0 on one instance and standby 1_0 and standby 2_0 on the other. But maybe that's not just worth optimizing at this point This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8588: [WIP] KAFKA-6145: KIP-441: Validate balanced assignment
ableegoldman commented on a change in pull request #8588: URL: https://github.com/apache/kafka/pull/8588#discussion_r418267732 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java ## @@ -358,6 +369,57 @@ private static void runRandomizedScenario(final long seed) { } } +private static void verifyBalancedAssignment(final Harness harness, final int balanceFactor) { +final int activeDiff; +final int activeStatefulDiff; +final double activeStatelessPerThreadDiff; +final int assignedDiff; +final int standbyDiff; + +{ +final int maxActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).max(comparingInt(i -> i)).orElse(0); +final int minActive = harness.clientStates.values().stream().map(ClientState::activeTaskCount).min(comparingInt(i -> i)).orElse(0); +activeDiff = maxActive - minActive; +} +{ +final int maxActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).max(comparingInt(i -> i)).orElse(0); +final int minActiveStateful = harness.clientStates.values().stream().map(s -> diff(TreeSet::new, s.activeTasks(), harness.statelessTasks).size()).min(comparingInt(i -> i)).orElse(0); +activeStatefulDiff = maxActiveStateful - minActiveStateful; +} +{ +final double maxActiveStatefulPerThread = harness.clientStates.values().stream().map(s1 -> 1.0 * intersection(TreeSet::new, s1.activeTasks(), harness.statelessTasks).size() / s1.capacity()).max(comparingDouble(i -> i)).orElse(0.0); Review comment: The HATA originally tried to balance each type of task individually (ie stateful active, standby, stateless active) and IIRC you made a convincing argument against doing that during the review and for balancing only the total task load. What's the rationale for enforcing this now? Or did I misremember and/or misinterpret your earlier point This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org