This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 85cee984ac0 MINOR: Fix rack-aware assignment tests (#14965)
85cee984ac0 is described below

commit 85cee984ac09d7bdb46634413f70fc74afe60d1f
Author: Hao Li <1127478+lihao...@users.noreply.github.com>
AuthorDate: Mon Dec 11 01:38:57 2023 -0800

    MINOR: Fix rack-aware assignment tests (#14965)
    
    Reviewers: Matthias J. Sax <matth...@confluent.io>
---
 .../internals/assignment/StickyTaskAssignor.java   |  6 +--
 .../HighAvailabilityTaskAssignorTest.java          | 44 ++++++++++++++++------
 .../assignment/StickyTaskAssignorTest.java         | 20 +++++++++-
 3 files changed, 52 insertions(+), 18 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
index 355ecd2e804..a97bb319ed2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
@@ -115,10 +115,8 @@ public class StickyTaskAssignor implements TaskAssignor {
             rackAwareTaskAssignor.optimizeActiveTasks(statefulTasks, 
clientStates, trafficCost, nonOverlapCost);
 
             final TreeSet<TaskId> statelessTasks = (TreeSet<TaskId>) 
diff(TreeSet::new, allTaskIds, statefulTasks);
-            if (!statelessTasks.isEmpty()) {
-                rackAwareTaskAssignor.optimizeActiveTasks(statelessTasks, 
clientStates,
-                    STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST);
-            }
+            // No-op if statelessTasks is empty
+            rackAwareTaskAssignor.optimizeActiveTasks(statelessTasks, 
clientStates, STATELESS_TRAFFIC_COST, STATELESS_NON_OVERLAP_COST);
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
index dfb223b3e64..850f4715b54 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
@@ -131,6 +131,7 @@ public class HighAvailabilityTaskAssignorTest {
     private final Time time = new MockTime();
 
     private boolean enableRackAwareTaskAssignor;
+    private int maxSkew = 1;
 
     @Parameter
     public String rackAwareStrategy;
@@ -138,6 +139,9 @@ public class HighAvailabilityTaskAssignorTest {
     @Before
     public void setUp() {
         enableRackAwareTaskAssignor = 
!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE);
+        if 
(rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY))
 {
+            maxSkew = 4;
+        }
     }
 
     @Parameterized.Parameters(name = "rackAwareStrategy={0}")
@@ -284,7 +288,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         if 
(!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC))
 {
             // Subtopology is not balanced with min_traffic rack aware 
assignment
-            assertBalancedTasks(clientStates);
+            assertBalancedTasks(clientStates, maxSkew);
         }
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTaskIds, clientStates, false, enableRackAwareTaskAssignor);
@@ -331,7 +335,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         if 
(!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC))
 {
             // Subtopology is not balanced with min_traffic rack aware 
assignment
-            assertBalancedTasks(clientStates);
+            assertBalancedTasks(clientStates, maxSkew);
         }
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTaskIds, clientStates, false, enableRackAwareTaskAssignor);
@@ -374,7 +378,7 @@ public class HighAvailabilityTaskAssignorTest {
         assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new 
StringBuilder());
         assertBalancedActiveAssignment(clientStates, new StringBuilder());
         assertBalancedStatefulAssignment(allTaskIds, clientStates, new 
StringBuilder());
-        assertBalancedTasks(clientStates);
+        assertBalancedTasks(clientStates, maxSkew);
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTaskIds, clientStates, false, enableRackAwareTaskAssignor);
     }
@@ -464,7 +468,7 @@ public class HighAvailabilityTaskAssignorTest {
         assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new 
StringBuilder());
         assertBalancedActiveAssignment(clientStates, new StringBuilder());
         assertBalancedStatefulAssignment(allTaskIds, clientStates, new 
StringBuilder());
-        assertBalancedTasks(clientStates);
+        assertBalancedTasks(clientStates, maxSkew);
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTaskIds, clientStates, false, enableRackAwareTaskAssignor);
     }
@@ -510,7 +514,7 @@ public class HighAvailabilityTaskAssignorTest {
 
         if 
(!rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC))
 {
             // Subtopology is not balanced with min_traffic rack aware 
assignment
-            assertBalancedTasks(clientStates);
+            assertBalancedTasks(clientStates, maxSkew);
         }
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTaskIds, clientStates, false, enableRackAwareTaskAssignor);
@@ -612,7 +616,7 @@ public class HighAvailabilityTaskAssignorTest {
         assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new 
StringBuilder());
         assertBalancedActiveAssignment(clientStates, new StringBuilder());
         assertBalancedStatefulAssignment(allTaskIds, clientStates, new 
StringBuilder());
-        assertBalancedTasks(clientStates);
+        assertBalancedTasks(clientStates, maxSkew);
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTaskIds, clientStates, false, enableRackAwareTaskAssignor);
     }
@@ -685,7 +689,7 @@ public class HighAvailabilityTaskAssignorTest {
         assertValidAssignment(0, allTasks, emptySet(), clientStates, new 
StringBuilder());
         assertBalancedActiveAssignment(clientStates, new StringBuilder());
         assertBalancedStatefulAssignment(allTasks, clientStates, new 
StringBuilder());
-        assertBalancedTasks(clientStates);
+        assertBalancedTasks(clientStates, maxSkew);
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTasks, clientStates, false, enableRackAwareTaskAssignor);
     }
@@ -715,7 +719,7 @@ public class HighAvailabilityTaskAssignorTest {
         assertValidAssignment(1, allTasks, emptySet(), clientStates, new 
StringBuilder());
         assertBalancedActiveAssignment(clientStates, new StringBuilder());
         assertBalancedStatefulAssignment(allTasks, clientStates, new 
StringBuilder());
-        assertBalancedTasks(clientStates);
+        assertBalancedTasks(clientStates, maxSkew);
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTasks, clientStates, true, enableRackAwareTaskAssignor);
     }
@@ -748,7 +752,7 @@ public class HighAvailabilityTaskAssignorTest {
         assertValidAssignment(0, 1, allTasks, emptySet(), clientStates, new 
StringBuilder());
         assertBalancedActiveAssignment(clientStates, new StringBuilder());
         assertBalancedStatefulAssignment(allTasks, clientStates, new 
StringBuilder());
-        assertBalancedTasks(clientStates);
+        assertBalancedTasks(clientStates, maxSkew);
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTasks, clientStates, false, enableRackAwareTaskAssignor);
     }
@@ -787,7 +791,7 @@ public class HighAvailabilityTaskAssignorTest {
         assertValidAssignment(1, 1, allTasks, emptySet(), clientStates, new 
StringBuilder());
         assertBalancedActiveAssignment(clientStates, new StringBuilder());
         assertBalancedStatefulAssignment(allTasks, clientStates, new 
StringBuilder());
-        assertBalancedTasks(clientStates);
+        assertBalancedTasks(clientStates, maxSkew);
 
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
allTasks, clientStates, true, enableRackAwareTaskAssignor);
     }
@@ -1431,10 +1435,14 @@ public class HighAvailabilityTaskAssignorTest {
         );
         assertBalancedActiveAssignment(clientStateMap, new StringBuilder());
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
taskIds, clientStateMap, true, enableRackAwareTaskAssignor);
+
+        if 
(rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY))
 {
+            assertBalancedTasks(clientStateMap, maxSkew);
+        }
     }
 
     @Test
-    public void shouldRemainOriginalAssignmentWithoutTrafficCost() {
+    public void 
shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy() {
         // This test tests that if the traffic cost is 0, we should have same 
assignment with or without
         // rack aware assignor enabled
         final int nodeSize = 50;
@@ -1476,9 +1484,9 @@ public class HighAvailabilityTaskAssignorTest {
         final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) 
taskTopicPartitionMap.keySet();
         final List<Set<TaskId>> statefulAndStatelessTasks = 
getRandomSubset(taskIds, 2);
         final Set<TaskId> statefulTasks = statefulAndStatelessTasks.get(0);
+        final Set<TaskId> statelessTasks = statefulAndStatelessTasks.get(1);
         final SortedMap<UUID, ClientState> clientStateMap = 
getRandomClientState(clientSize,
             tpSize, partitionSize, maxCapacity, false, statefulTasks);
-        final SortedMap<UUID, ClientState> clientStateMapCopy = 
copyClientStateMap(clientStateMap);
 
         new HighAvailabilityTaskAssignor().assign(
             clientStateMap,
@@ -1488,6 +1496,18 @@ public class HighAvailabilityTaskAssignorTest {
             configs
         );
 
+        assertValidAssignment(1, statefulTasks, statelessTasks, 
clientStateMap, new StringBuilder());
+        if 
(rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE)) {
+            return;
+        }
+        if 
(rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY))
 {
+            // Original assignment won't be maintained because we calculate 
the assignment using max flow first
+            // in balance subtopology strategy
+            assertBalancedTasks(clientStateMap, maxSkew);
+            return;
+        }
+
+        final SortedMap<UUID, ClientState> clientStateMapCopy = 
copyClientStateMap(clientStateMap);
         configs = new AssignorConfiguration.AssignmentConfigs(
             0L,
             1,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
index 0541892c7dc..9a3c7ddc199 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
@@ -74,6 +74,7 @@ import static 
org.apache.kafka.streams.processor.internals.assignment.Assignment
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_4;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_5;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_6;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertBalancedTasks;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.assertValidAssignment;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.copyClientStateMap;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClusterForAllTopics;
@@ -953,10 +954,13 @@ public class StickyTaskAssignorTest {
             new StringBuilder()
         );
         verifyTaskPlacementWithRackAwareAssignor(rackAwareTaskAssignor, 
taskIds, clientStateMap, true, enableRackAwareTaskAssignor);
+        if 
(rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY))
 {
+            assertBalancedTasks(clientStateMap, 4);
+        }
     }
 
     @Test
-    public void shouldRemainOriginalAssignmentWithoutTrafficCost() {
+    public void 
shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy() {
         // This test tests that if the traffic cost is 0, we should have same 
assignment with or without
         // rack aware assignor enabled
         final int nodeSize = 50;
@@ -997,9 +1001,9 @@ public class StickyTaskAssignorTest {
         final SortedSet<TaskId> taskIds = (SortedSet<TaskId>) 
taskTopicPartitionMap.keySet();
         final List<Set<TaskId>> statefulAndStatelessTasks = 
getRandomSubset(taskIds, 2);
         final Set<TaskId> statefulTasks = statefulAndStatelessTasks.get(0);
+        final Set<TaskId> statelessTasks = statefulAndStatelessTasks.get(1);
         final SortedMap<UUID, ClientState> clientStateMap = 
getRandomClientState(clientSize,
             tpSize, partitionSize, maxCapacity, false, statefulTasks);
-        final SortedMap<UUID, ClientState> clientStateMapCopy = 
copyClientStateMap(clientStateMap);
 
         new StickyTaskAssignor().assign(
             clientStateMap,
@@ -1009,6 +1013,18 @@ public class StickyTaskAssignorTest {
             configs
         );
 
+        assertValidAssignment(1, statefulTasks, statelessTasks, 
clientStateMap, new StringBuilder());
+        if 
(rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE)) {
+            return;
+        }
+        if 
(rackAwareStrategy.equals(StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY))
 {
+            // Original assignment won't be maintained because we calculate 
the assignment using max flow first
+            // in balance subtopology strategy
+            assertBalancedTasks(clientStateMap, 4);
+            return;
+        }
+
+        final SortedMap<UUID, ClientState> clientStateMapCopy = 
copyClientStateMap(clientStateMap);
         configs = new AssignorConfiguration.AssignmentConfigs(
             0L,
             1,

Reply via email to