[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-22 Thread GitBox


vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r429346417



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map> tasksToCau
 return movementsNeeded;
 }
 
+static int assignStandbyTaskMovements(final Map> 
tasksToCaughtUpClients,
+  final Map 
clientStates,
+  final AtomicInteger 
remainingWarmupReplicas,
+  final Map> 
warmups) {
+final BiFunction caughtUpPredicate =
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
+
+final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new 
ConstrainedPrioritySet(
+caughtUpPredicate,
+client -> clientStates.get(client).assignedTaskLoad()
+);
+
+final Queue taskMovements = new PriorityQueue<>(
+
Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
+);
+
+for (final Map.Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID destination = clientStateEntry.getKey();
+final ClientState state = clientStateEntry.getValue();
+for (final TaskId task : state.standbyTasks()) {
+if (warmups.getOrDefault(destination, 
Collections.emptySet()).contains(task)) {
+// this is a warmup, so we won't move it.
+} else if 
(!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, 
tasksToCaughtUpClients)) {

Review comment:
   Haha, sure :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-22 Thread GitBox


vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r429346085



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -57,14 +59,42 @@ public boolean assign(final Map clients,
 configs.numStandbyReplicas
 );
 
-final boolean probingRebalanceNeeded = assignTaskMovements(
-tasksToCaughtUpClients(statefulTasks, clientStates, 
configs.acceptableRecoveryLag),
+final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(configs.maxWarmupReplicas);
+
+final Map> tasksToCaughtUpClients = 
tasksToCaughtUpClients(
+statefulTasks,
+clientStates,
+configs.acceptableRecoveryLag
+);
+
+// We temporarily need to know which standby tasks were intended as 
warmups
+// for active tasks, so that we don't move them (again) when we plan 
standby
+// task movements. We can then immediately treat warmups exactly the 
same as
+// hot-standby replicas, so we just track it right here as metadata, 
rather
+// than add "warmup" assignments to ClientState, for example.
+final Map> warmups = new TreeMap<>();
+
+final int neededActiveTaskMovements = assignActiveTaskMovements(
+tasksToCaughtUpClients,
 clientStates,
-configs.maxWarmupReplicas
+warmups,
+remainingWarmupReplicas
+);
+
+final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+tasksToCaughtUpClients,
+clientStates,
+remainingWarmupReplicas,
+warmups
 );
 
 assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
+// We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
+// due to being configured for no warmups.

Review comment:
   Yeah, I've just realized this, too. And upon second consideration, I 
don't think the warmup=0 really provides a good mechanism for what I was 
thinking of. I think we'd better leave it as "at least one". Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-19 Thread GitBox


vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r427615915



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##
@@ -338,7 +338,7 @@ private static void runRandomizedScenario(final long seed) {
 throw new IllegalStateException("Unexpected event: " + 
event);
 }
 if (!harness.clientStates.isEmpty()) {
-testForConvergence(harness, configs, numStatefulTasks * 2);
+testForConvergence(harness, configs, 2 * (numStatefulTasks 
+ numStatefulTasks * numStandbyReplicas));

Review comment:
   Now that we're warming up standbys also, we need to relax the 
convergence limit.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -57,14 +59,42 @@ public boolean assign(final Map clients,
 configs.numStandbyReplicas
 );
 
-final boolean probingRebalanceNeeded = assignTaskMovements(
-tasksToCaughtUpClients(statefulTasks, clientStates, 
configs.acceptableRecoveryLag),
+final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(configs.maxWarmupReplicas);

Review comment:
   Moved the counter out here because we need to decrement it while 
assigning both active and standby warmups

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -78,6 +80,64 @@
 /*probingRebalanceIntervalMs*/ 60 * 1000L
 );
 
+@Test
+public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {

Review comment:
   First test for stickiness: we should be 100% sticky and also not 
schedule a probing rebalance when we are configured for no warmups.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##
@@ -78,6 +80,64 @@
 /*probingRebalanceIntervalMs*/ 60 * 1000L
 );
 
+@Test
+public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+final Set allTaskIds = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, 
TASK_1_0, TASK_1_1, TASK_1_2, TASK_2_0, TASK_2_1, TASK_2_2);
+final ClientState clientState1 = new ClientState(allTaskIds, 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 0L)), 1);
+final ClientState clientState2 = new ClientState(emptySet(), 
allTaskIds, allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 10L)), 1);
+final ClientState clientState3 = new ClientState(emptySet(), 
emptySet(), allTaskIds.stream().collect(Collectors.toMap(k -> k, k -> 
Long.MAX_VALUE)), 1);
+
+final Map clientStates = mkMap(
+mkEntry(UUID_1, clientState1),
+mkEntry(UUID_2, clientState2),
+mkEntry(UUID_3, clientState3)
+);
+
+final boolean unstable = new HighAvailabilityTaskAssignor().assign(
+clientStates,
+allTaskIds,
+allTaskIds,
+new AssignmentConfigs(11L, 0, 1, 0L)
+);
+
+assertThat(clientState1, hasAssignedTasks(allTaskIds.size()));
+
+assertThat(clientState2, hasAssignedTasks(allTaskIds.size()));
+
+assertThat(clientState3, hasAssignedTasks(0));
+
+assertThat(unstable, is(false));
+}
+
+@Test
+public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {

Review comment:
   Main test case for stickiness: we should be sticky for standbys, and 
also schedule warmups.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -64,12 +66,10 @@ private static boolean 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
 return caughtUpClients == null || caughtUpClients.contains(client);
 }
 
-/**
- * @return whether any warmup replicas were assigned
- */
-static boolean assignTaskMovements(final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final int maxWarmupReplicas) {
+static int assignActiveTaskMovements(final Map> 
tasksToCaughtUpClients,

Review comment:
   I changed this to return an int just because it made stepping through 
the assignment in the debugger a bit easier to understand. It serves no 
algorithmic purpose.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##
@@ -326,6 +326,10 @@ static TaskSkewReport analyzeTaskAssignmentBalance(final 
Map
 return new TaskSkewReport(maxTaskSkew, skewedSubtopologies, 
subtopologyToClientsWithPartition);
 }
 
+static Matcher hasAssignedTasks(final