This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 85cee984ac0 MINOR: Fix rack-aware assignment tests (#14965) 85cee984ac0 is described below commit 85cee984ac09d7bdb46634413f70fc74afe60d1f Author: Hao Li <1127478+lihao...@users.noreply.github.com> AuthorDate: Mon Dec 11 01:38:57 2023 -0800 MINOR: Fix rack-aware assignment tests (#14965) Reviewers: Matthias J. Sax <matth...@confluent.io> --- .../internals/assignment/StickyTaskAssignor.java | 6 +-- .../HighAvailabilityTaskAssignorTest.java | 44 ++++++++++++++++------ .../assignment/StickyTaskAssignorTest.java | 20 +++++++++- 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java index 355ecd2e804..a97bb319ed2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java @@ -115,10 +115,8 @@ public class StickyTaskAssignor implements TaskAssignor { rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, clientStates, trafficCost, nonOverlapCost); final TreeSet<TaskId> statelessTasks = (TreeSet<TaskId>) diff(TreeSet::new, allTaskIds, statefulTasks); - if (!statelessTasks.isEmpty()) { - rackAwareTaskAssignor.optimizeActiveTasks(statelessTasks, clientStates, - STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST); - } + // No-op if statelessTasks is empty + rackAwareTaskAssignor.optimizeActiveTasks(statelessTasks, clientStates, STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java index dfb223b3e64..850f4715b54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java @@ -131,6 +131,7 @@ public class HighAvailabilityTaskAssignorTest { private final Time time = new MockTime(); private boolean enableRackAwareTaskAssignor; + private int maxSkew = 1; @Parameter public String rackAwareStrategy; @@ -138,6 +139,9 @@ public class HighAvailabilityTaskAssignorTest { @Before public void setUp() { enableRackAwareTaskAssignor = !rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE); + if (rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY)) { + maxSkew = 4; + } } @Parameterized.Parameters(name = "rackAwareStrategy={0}") @@ -284,7 +288,7 @@ public class HighAvailabilityTaskAssignorTest { if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) { // Subtopology is not balanced with min_traffic rack aware assignment - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); } verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTaskIds, clientStates, false, enableRackAwareTaskAssignor); @@ -331,7 +335,7 @@ public class HighAvailabilityTaskAssignorTest { if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) { // Subtopology is not balanced with min_traffic rack aware assignment - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); } verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTaskIds, clientStates, false, enableRackAwareTaskAssignor); @@ -374,7 +378,7 @@ public class HighAvailabilityTaskAssignorTest { assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder()); assertBalancedActiveAssignment(clientStates, new StringBuilder()); assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder()); - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTaskIds, clientStates, false, enableRackAwareTaskAssignor); } @@ -464,7 +468,7 @@ public class HighAvailabilityTaskAssignorTest { assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder()); assertBalancedActiveAssignment(clientStates, new StringBuilder()); assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder()); - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTaskIds, clientStates, false, enableRackAwareTaskAssignor); } @@ -510,7 +514,7 @@ public class HighAvailabilityTaskAssignorTest { if (!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC)) { // Subtopology is not balanced with min_traffic rack aware assignment - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); } verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTaskIds, clientStates, false, enableRackAwareTaskAssignor); @@ -612,7 +616,7 @@ public class HighAvailabilityTaskAssignorTest { assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder()); assertBalancedActiveAssignment(clientStates, new StringBuilder()); assertBalancedStatefulAssignment(allTaskIds, clientStates, new StringBuilder()); - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTaskIds, clientStates, false, enableRackAwareTaskAssignor); } @@ -685,7 +689,7 @@ public class HighAvailabilityTaskAssignorTest { assertValidAssignment(0, allTasks, emptySet(), clientStates, new StringBuilder()); assertBalancedActiveAssignment(clientStates, new StringBuilder()); assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder()); - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTasks, clientStates, false, enableRackAwareTaskAssignor); } @@ -715,7 +719,7 @@ public class HighAvailabilityTaskAssignorTest { assertValidAssignment(1, allTasks, emptySet(), clientStates, new StringBuilder()); assertBalancedActiveAssignment(clientStates, new StringBuilder()); assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder()); - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTasks, clientStates, true, enableRackAwareTaskAssignor); } @@ -748,7 +752,7 @@ public class HighAvailabilityTaskAssignorTest { assertValidAssignment(0, 1, allTasks, emptySet(), clientStates, new StringBuilder()); assertBalancedActiveAssignment(clientStates, new StringBuilder()); assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder()); - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTasks, clientStates, false, enableRackAwareTaskAssignor); } @@ -787,7 +791,7 @@ public class HighAvailabilityTaskAssignorTest { assertValidAssignment(1, 1, allTasks, emptySet(), clientStates, new StringBuilder()); assertBalancedActiveAssignment(clientStates, new StringBuilder()); assertBalancedStatefulAssignment(allTasks, clientStates, new StringBuilder()); - assertBalancedTasks(clientStates); + assertBalancedTasks(clientStates, maxSkew); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, allTasks, clientStates, true, enableRackAwareTaskAssignor); } @@ -1431,10 +1435,14 @@ public class HighAvailabilityTaskAssignorTest { ); assertBalancedActiveAssignment(clientStateMap, new StringBuilder()); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, taskIds, clientStateMap, true, enableRackAwareTaskAssignor); + + if (rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY)) { + assertBalancedTasks(clientStateMap, maxSkew); + } } @Test - public void shouldRemainOriginalAssignmentWithoutTrafficCost() { + public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy() { // This test tests that if the traffic cost is 0, we should have same assignment with or without // rack aware assignor enabled final int nodeSize = 50; @@ -1476,9 +1484,9 @@ public class HighAvailabilityTaskAssignorTest { final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) taskTopicPartitionMap.keySet(); final List<Set<TaskId>> statefulAndStatelessTasks = getRandomSubset(taskIds, 2); final Set<TaskId> statefulTasks = statefulAndStatelessTasks.get(0); + final Set<TaskId> statelessTasks = statefulAndStatelessTasks.get(1); final SortedMap<UUID, ClientState> clientStateMap = getRandomClientState(clientSize, tpSize, partitionSize, maxCapacity, false, statefulTasks); - final SortedMap<UUID, ClientState> clientStateMapCopy = copyClientStateMap(clientStateMap); new HighAvailabilityTaskAssignor().assign( clientStateMap, @@ -1488,6 +1496,18 @@ public class HighAvailabilityTaskAssignorTest { configs ); + assertValidAssignment(1, statefulTasks, statelessTasks, clientStateMap, new StringBuilder()); + if (rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE)) { + return; + } + if (rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY)) { + // Original assignment won't be maintained because we calculate the assignment using max flow first + // in balance subtopology strategy + assertBalancedTasks(clientStateMap, maxSkew); + return; + } + + final SortedMap<UUID, ClientState> clientStateMapCopy = copyClientStateMap(clientStateMap); configs = new AssignorConfiguration.AssignmentConfigs( 0L, 1, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java index 0541892c7dc..9a3c7ddc199 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java @@ -74,6 +74,7 @@ import static org.apache.kafka.streams.processor.internals.assignment.Assignment import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_4; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_5; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_6; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.copyClientStateMap; import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics; @@ -953,10 +954,13 @@ public class StickyTaskAssignorTest { new StringBuilder() ); verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, taskIds, clientStateMap, true, enableRackAwareTaskAssignor); + if (rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY)) { + assertBalancedTasks(clientStateMap, 4); + } } @Test - public void shouldRemainOriginalAssignmentWithoutTrafficCost() { + public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy() { // This test tests that if the traffic cost is 0, we should have same assignment with or without // rack aware assignor enabled final int nodeSize = 50; @@ -997,9 +1001,9 @@ public class StickyTaskAssignorTest { final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) taskTopicPartitionMap.keySet(); final List<Set<TaskId>> statefulAndStatelessTasks = getRandomSubset(taskIds, 2); final Set<TaskId> statefulTasks = statefulAndStatelessTasks.get(0); + final Set<TaskId> statelessTasks = statefulAndStatelessTasks.get(1); final SortedMap<UUID, ClientState> clientStateMap = getRandomClientState(clientSize, tpSize, partitionSize, maxCapacity, false, statefulTasks); - final SortedMap<UUID, ClientState> clientStateMapCopy = copyClientStateMap(clientStateMap); new StickyTaskAssignor().assign( clientStateMap, @@ -1009,6 +1013,18 @@ public class StickyTaskAssignorTest { configs ); + assertValidAssignment(1, statefulTasks, statelessTasks, clientStateMap, new StringBuilder()); + if (rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE)) { + return; + } + if (rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY)) { + // Original assignment won't be maintained because we calculate the assignment using max flow first + // in balance subtopology strategy + assertBalancedTasks(clientStateMap, 4); + return; + } + + final SortedMap<UUID, ClientState> clientStateMapCopy = copyClientStateMap(clientStateMap); configs = new AssignorConfiguration.AssignmentConfigs( 0L, 1,