vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r427615915
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##
@@ -338,7 +338,7 @@ private static void runRandomizedScenario(final long seed) {
throw new IllegalStateException("Unexpected event: " +
event);
}
if (!harness.clientStates.isEmpty()) {
-testForConvergence(harness, configs, numStatefulTasks * 2);
+testForConvergence(harness, configs, 2 * (numStatefulTasks
+ numStatefulTasks * numStandbyReplicas));
Review comment:
Now that we're warming up standbys also, we need to relax the
convergence limit.
##
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -57,14 +59,42 @@ public boolean assign(final Map clients,
configs.numStandbyReplicas
);
-final boolean probingRebalanceNeeded = assignTaskMovements(
-tasksToCaughtUpClients(statefulTasks, clientStates,
configs.acceptableRecoveryLag),
+final AtomicInteger remainingWarmupReplicas = new
AtomicInteger(configs.maxWarmupReplicas);
Review comment:
Moved the counter out here because we need to decrement it while
assigning both active and standby warmups
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -78,6 +80,64 @@
/*probingRebalanceIntervalMs*/ 60 * 1000L
);
+@Test
+public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
Review comment:
First test for stickiness: we should be 100% sticky and also not
schedule a probing rebalance when we are configured for no warmups.
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -78,6 +80,64 @@
/*probingRebalanceIntervalMs*/ 60 * 1000L
);
+@Test
+public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+final Set allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2,
TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+final ClientState clientState1 = new ClientState(allTaskIds,
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+final ClientState clientState2 = new ClientState(emptySet(),
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+final ClientState clientState3 = new ClientState(emptySet(),
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k ->
Long.MAX_VALUE)), 1);
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, clientState1),
+mkEntry(UUID_2, clientState2),
+mkEntry(UUID_3, clientState3)
+);
+
+final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+clientStates,
+allTaskIds,
+allTaskIds,
+new AssignmentConfigs(11L, 0, 1, 0L)
+);
+
+assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+assertThat(clientState3, hasAssignedTasks(0));
+
+assertThat(unstable, is(false));
+}
+
+@Test
+public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {
Review comment:
Main test case for stickiness: we should be sticky for standbys, and
also schedule warmups.
##
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -64,12 +66,10 @@ private static boolean
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
return caughtUpClients == null || caughtUpClients.contains(client);
}
-/**
- * @return whether any warmup replicas were assigned
- */
-static boolean assignTaskMovements(final Map>
tasksToCaughtUpClients,
- final Map
clientStates,
- final int maxWarmupReplicas) {
+static int assignActiveTaskMovements(final Map>
tasksToCaughtUpClients,
Review comment:
I changed this to return an int just because it made stepping through
the assignment in the debugger a bit easier to understand. It serves no
algorithmic purpose.
##
File path:
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##
@@ -326,6 +326,10 @@ static TaskSkewReport analyzeTaskAssignmentBalance(final
Map
return new TaskSkewReport(maxTaskSkew, skewedSubtopologies,
subtopologyToClientsWithPartition);
}
+static Matcher hasAssignedTasks(final