vvcephei commented on a change in pull request #8689: URL: https://github.com/apache/kafka/pull/8689#discussion_r427365831
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ########## @@ -170,7 +171,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverUnevenlyDistributedStreamTh } @Test - public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithLessClientsThanTasks() { + public void shouldAssignActiveStatefulTasksEvenlyOverClientsWithMoreClientsThanTasks() { Review comment: Huh, good catch! ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java ########## @@ -213,6 +214,61 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsAndStreamThreadsWith assertBalancedTasks(clientStates); } + @Test + public void shouldAssignWarmUpTasksIfStatefulActiveTasksBalancedOverStreamThreadsButNotOverClients() { + final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1); + final Map<TaskId, Long> lagsForCaughtUpClient = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)); + final Map<TaskId, Long> lagsForNotCaughtUpClient = + allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)); + final ClientState caughtUpClientState = new ClientState(allTaskIds, emptySet(), lagsForCaughtUpClient, 5); + final ClientState notCaughtUpClientState1 = new ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, 5); + final ClientState notCaughtUpClientState2 = new ClientState(emptySet(), emptySet(), lagsForNotCaughtUpClient, 5); + final Map<UUID, ClientState> clientStates = + getClientStatesMap(caughtUpClientState, notCaughtUpClientState1, notCaughtUpClientState2); + final boolean unstable = new HighAvailabilityTaskAssignor().assign( + clientStates, + allTaskIds, + allTaskIds, + new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 0L) + ); + + assertThat(unstable, is(true)); + assertThat(notCaughtUpClientState1.standbyTaskCount(), greaterThanOrEqualTo(allTaskIds.size() / 3)); + assertThat(notCaughtUpClientState2.standbyTaskCount(), greaterThanOrEqualTo(allTaskIds.size() / 3)); + assertValidAssignment(0, allTaskIds.size() / 3 + 1, allTaskIds, emptySet(), clientStates, new StringBuilder()); + } + + @Test + public void shouldEvenlyAssignActiveStatefulTasksIfClientsAreWarmedUpToBalanceTaskOverClients() { + final Set<TaskId> allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1); + final Set<TaskId> warmedUpTaskIds1 = mkSet(TASK_0_1); + final Set<TaskId> warmedUpTaskIds2 = mkSet(TASK_1_0); + final Map<TaskId, Long> lagsForCaughtUpClient = allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)); + final Map<TaskId, Long> lagsForWarmedUpClient1 = + allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)); + lagsForWarmedUpClient1.put(TASK_0_1, 0L); + final Map<TaskId, Long> lagsForWarmedUpClient2 = + allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> Long.MAX_VALUE)); + lagsForWarmedUpClient2.put(TASK_1_0, 0L); + final ClientState caughtUpClientState = new ClientState(allTaskIds, emptySet(), lagsForCaughtUpClient, 5); + final ClientState warmedUpClientState1 = new ClientState(emptySet(), warmedUpTaskIds1, lagsForWarmedUpClient1, 5); + final ClientState warmedUpClientState2 = new ClientState(emptySet(), warmedUpTaskIds2, lagsForWarmedUpClient2, 5); + final Map<UUID, ClientState> clientStates = + getClientStatesMap(caughtUpClientState, warmedUpClientState1, warmedUpClientState2); + final boolean unstable = new HighAvailabilityTaskAssignor().assign( + clientStates, + allTaskIds, + allTaskIds, + new AssignmentConfigs(0L, allTaskIds.size() / 3 + 1, 0, 0L) + ); + + assertThat(unstable, is(false)); + assertValidAssignment(0, allTaskIds.size() / 3 + 1, allTaskIds, emptySet(), clientStates, new StringBuilder()); Review comment: If we expect no warmups, we can assert it here with: ```suggestion assertValidAssignment(0, allTaskIds, emptySet(), clientStates, new StringBuilder()); ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org