vvcephei commented on a change in pull request #8689:
URL: https://github.com/apache/kafka/pull/8689#discussion_r427365831



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -170,7 +171,7 @@ public void 
shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamTh
     }
 
     @Test
-    public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsWithLessClientsThanTasks() {
+    public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks() {

Review comment:
       Huh, good catch!

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -213,6 +214,61 @@ public void 
shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWith
         assertBalancedTasks(clientStates);
     }
 
+    @Test
+    public void 
shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients()
 {
+        final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_1_0, 
TASK_1_1);
+        final Map<TaskId, Long> lagsForCaughtUpClient = 
allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L));
+        final Map<TaskId, Long> lagsForNotCaughtUpClient =
+            allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 
Long.MAX_VALUE));
+        final ClientState caughtUpClientState = new ClientState(allTaskIds, 
emptySet(), lagsForCaughtUpClient, 5);
+        final ClientState notCaughtUpClientState1 = new 
ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, 5);
+        final ClientState notCaughtUpClientState2 = new 
ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, 5);
+        final Map<UUID, ClientState> clientStates =
+            getClientStatesMap(caughtUpClientState, notCaughtUpClientState1, 
notCaughtUpClientState2);
+        final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+            clientStates,
+            allTaskIds,
+            allTaskIds,
+            new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 0L)
+        );
+
+        assertThat(unstable, is(true));
+        assertThat(notCaughtUpClientState1.standbyTaskCount(), 
greaterThanOrEqualTo(allTaskIds.size() / 3));
+        assertThat(notCaughtUpClientState2.standbyTaskCount(), 
greaterThanOrEqualTo(allTaskIds.size() / 3));
+        assertValidAssignment(0, allTaskIds.size() / 3 + 1, allTaskIds, 
emptySet(), clientStates, new StringBuilder());
+    }
+
+    @Test
+    public void 
shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients()
 {
+        final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_1_0, 
TASK_1_1);
+        final Set<TaskId> warmedUpTaskIds1 = mkSet(TASK_0_1);
+        final Set<TaskId> warmedUpTaskIds2 = mkSet(TASK_1_0);
+        final Map<TaskId, Long> lagsForCaughtUpClient = 
allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L));
+        final Map<TaskId, Long> lagsForWarmedUpClient1 =
+            allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 
Long.MAX_VALUE));
+        lagsForWarmedUpClient1.put(TASK_0_1, 0L);
+        final Map<TaskId, Long> lagsForWarmedUpClient2 =
+            allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 
Long.MAX_VALUE));
+        lagsForWarmedUpClient2.put(TASK_1_0, 0L);
+        final ClientState caughtUpClientState = new ClientState(allTaskIds, 
emptySet(), lagsForCaughtUpClient, 5);
+        final ClientState warmedUpClientState1 = new ClientState(emptySet(), 
warmedUpTaskIds1, lagsForWarmedUpClient1, 5);
+        final ClientState warmedUpClientState2 = new ClientState(emptySet(), 
warmedUpTaskIds2, lagsForWarmedUpClient2, 5);
+        final Map<UUID, ClientState> clientStates =
+            getClientStatesMap(caughtUpClientState, warmedUpClientState1, 
warmedUpClientState2);
+        final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+            clientStates,
+            allTaskIds,
+            allTaskIds,
+            new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 0L)
+        );
+
+        assertThat(unstable, is(false));
+        assertValidAssignment(0, allTaskIds.size() / 3 + 1, allTaskIds, 
emptySet(), clientStates, new StringBuilder());

Review comment:
       If we expect no warmups, we can assert it here with:
   ```suggestion
           assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new 
StringBuilder());
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to