[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424825641



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ConstrainedPrioritySet.java
##
@@ -16,77 +16,58 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.streams.processor.TaskId;
+
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.BiFunction;
-import org.apache.kafka.streams.processor.TaskId;
+import java.util.function.Function;
 
 /**
  * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
  */
-class ValidClientsByTaskLoadQueue {
+class ConstrainedPrioritySet {
 
 private final PriorityQueue clientsByTaskLoad;
-private final BiFunction validClientCriteria;
+private final BiFunction constraint;
 private final Set uniqueClients = new HashSet<>();
 
-ValidClientsByTaskLoadQueue(final Map clientStates,
-final BiFunction 
validClientCriteria) {
-this.validClientCriteria = validClientCriteria;
-
-clientsByTaskLoad = new PriorityQueue<>(
-(client, other) -> {
-final double clientTaskLoad = 
clientStates.get(client).taskLoad();
-final double otherTaskLoad = 
clientStates.get(other).taskLoad();
-if (clientTaskLoad < otherTaskLoad) {
-return -1;
-} else if (clientTaskLoad > otherTaskLoad) {
-return 1;
-} else {
-return client.compareTo(other);
-}
-});
+ConstrainedPrioritySet(final BiFunction constraint,
+   final Function weight) {
+this.constraint = constraint;
+clientsByTaskLoad = new 
PriorityQueue<>(Comparator.comparing(weight).thenComparing(clientId -> 
clientId));
 }
 
 /**
  * @return the next least loaded client that satisfies the given criteria, 
or null if none do
  */
-UUID poll(final TaskId task) {
-final List validClient = poll(task, 1);
-return validClient.isEmpty() ? null : validClient.get(0);
-}
-
-/**
- * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
- */
-List poll(final TaskId task, final int numClients) {
-final List nextLeastLoadedValidClients = new LinkedList<>();
+UUID poll(final TaskId task, final Function 
extraConstraint) {

Review comment:
   I was operating more on intuition here. To be honest, I had a suspicion 
you would call this out, so I probably should have just saved time and taken 
the time to prove it.
   
   Forgetting about the constraint for a minute, I think that what I had in 
mind for balance is something like, suppose you have two clients "C1" and 
"C2"... C1 has one task and C2 has two. You poll and get C1 and add a task. 
Now, they both have two.
   
   If you add it back and poll again, you might prefer to get C1 back again. 
Maybe because the "weight" function takes into account more than just the task 
load, or maybe just because of the total order we impose based on clientId, in 
which `C1 < C2`. But if you just poll two clients to begin with, then C1 
doesn't get a chance to be included for the second poll, you just automatically 
get C1 and C2.
   
   In retrospect, this might be moot in practice, because the only time we 
actually polled for multiple clients was when assigning standbys, and 
specifically when we were assigning multiple replicas of the same task, in 
which case, we know that we _cannot_ consider C1 again for the second poll.
   
   From a computer-sciencey perspective, it doesn't seem like the data 
structure should be able to make this assumption, though, since it can't know 
that polling a client also invalidates it for a subsequent poll with the same 
last-mile predicate.
   
   So, even in retrospect, I'm tempted to leave it this way (big surprise 
there), although I'd acknowledge that the outcome is actually not different in 
the way that we would use the method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424822166



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##
@@ -236,16 +233,17 @@ public void 
staticAssignmentShouldConvergeWithTheFirstAssignment() {
 0,
 1000L);
 
-final Harness harness = Harness.initializeCluster(1, 1, 1);
+final Harness harness = Harness.initializeCluster(1, 1, 1, () -> 1);
 
 testForConvergence(harness, configs, 1);
 verifyValidAssignment(0, harness);
+verifyBalancedAssignment(harness);
 }
 
 @Test
 public void assignmentShouldConvergeAfterAddingNode() {
-final int numStatelessTasks = 15;
-final int numStatefulTasks = 13;
+final int numStatelessTasks = 7;

Review comment:
   Right, but I think there's a "3" or a "5" in there somewhere, maybe in 
the other test. Anyway, my _intent_ was to make them all prime so I wouldn't 
have to think to hard about whether they were all coprime. But, in reality, I 
managed to screw up both.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424821641



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovementTest.java
##
@@ -35,262 +44,161 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_3;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getClientStatesMap;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.hasProperty;
 import static 
org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
+import static org.hamcrest.Matchers.is;
 
 public class TaskMovementTest {
-private final ClientState client1 = new ClientState(1);
-private final ClientState client2 = new ClientState(1);
-private final ClientState client3 = new ClientState(1);
-
-private final Map clientStates = 
getClientStatesMap(client1, client2, client3);
-
-private final Map> emptyWarmupAssignment = mkMap(
-mkEntry(UUID_1, EMPTY_TASK_LIST),
-mkEntry(UUID_2, EMPTY_TASK_LIST),
-mkEntry(UUID_3, EMPTY_TASK_LIST)
-);
-
 @Test
 public void 
shouldAssignTasksToClientsAndReturnFalseWhenAllClientsCaughtUp() {
 final int maxWarmupReplicas = Integer.MAX_VALUE;
 final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, 
TASK_1_0, TASK_1_1, TASK_1_2);
 
-final Map> balancedAssignment = mkMap(
-mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-);
-
 final Map> tasksToCaughtUpClients = new 
HashMap<>();
 for (final TaskId task : allTasks) {
 tasksToCaughtUpClients.put(task, mkSortedSet(UUID_1, UUID_2, 
UUID_3));
 }
-
-assertFalse(
+
+final ClientState client1 = 
getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+final ClientState client2 = 
getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+final ClientState client3 = 
getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
+
+assertThat(
 assignTaskMovements(
-balancedAssignment,
 tasksToCaughtUpClients,
-clientStates,
-getMapWithNumStandbys(allTasks, 1),
-maxWarmupReplicas)
+getClientStatesMap(client1, client2, client3),
+maxWarmupReplicas),
+is(false)
 );
-
-verifyClientStateAssignments(balancedAssignment, 
emptyWarmupAssignment);
 }
 
 @Test
 public void 
shouldAssignAllTasksToClientsAndReturnFalseIfNoClientsAreCaughtUp() {
-final int maxWarmupReplicas = 2;
-final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2, 
TASK_1_0, TASK_1_1, TASK_1_2);
+final int maxWarmupReplicas = Integer.MAX_VALUE;
 
-final Map> balancedAssignment = mkMap(
-mkEntry(UUID_1, asList(TASK_0_0, TASK_1_0)),
-mkEntry(UUID_2, asList(TASK_0_1, TASK_1_1)),
-mkEntry(UUID_3, asList(TASK_0_2, TASK_1_2))
-);
+final ClientState client1 = 
getClientStateWithActiveAssignment(asList(TASK_0_0, TASK_1_0));
+final ClientState client2 = 
getClientStateWithActiveAssignment(asList(TASK_0_1, TASK_1_1));
+final ClientState client3 = 
getClientStateWithActiveAssignment(asList(TASK_0_2, TASK_1_2));
 
-assertFalse(
+assertThat(
 assignTaskMovements(
-balancedAssignment,
 emptyMap(),
-clientStates,
-getMapWithNumStandbys(allTasks, 1),
-maxWarmupReplicas)
+getClientStatesMap(client1, client2, client3),
+maxWarmupReplicas),
+is(false)
 );
-verifyClientStateAssignments(balancedAssignment, 
emptyWarmupAssignment);
 }
 
 @Test
 public void 
shouldMoveTasksToCaughtUpClientsAndAssignWarmupReplicasInTheirPlace() {
 final int maxWarmupReplicas = Integer.MAX_VALUE;
-final Set allTasks = mkSet(TASK_0_0, TASK_0_1, TASK_0_2);
+final ClientState client1 = 

[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424820836



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -53,75 +67,94 @@ private static boolean 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
 /**
  * @return whether any warmup replicas were assigned
  */
-static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
+static boolean assignTaskMovements(final Map> 
tasksToCaughtUpClients,
final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
final int maxWarmupReplicas) {
-boolean warmupReplicasAssigned = false;
+final BiFunction caughtUpPredicate =
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
 
-final ValidClientsByTaskLoadQueue clientsByTaskLoad = new 
ValidClientsByTaskLoadQueue(
-clientStates,
-(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients)
+final ConstrainedPrioritySet clientsByTaskLoad = new 
ConstrainedPrioritySet(
+caughtUpPredicate,
+client -> clientStates.get(client).taskLoad()
 );
 
-final SortedSet taskMovements = new TreeSet<>(
-(movement, other) -> {
-final int numCaughtUpClients = movement.caughtUpClients.size();
-final int otherNumCaughtUpClients = 
other.caughtUpClients.size();
-if (numCaughtUpClients != otherNumCaughtUpClients) {
-return Integer.compare(numCaughtUpClients, 
otherNumCaughtUpClients);
-} else {
-return movement.task.compareTo(other.task);
-}
-}
+final Queue taskMovements = new PriorityQueue<>(
+
Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
 );
 
-for (final Map.Entry> assignmentEntry : 
statefulActiveTaskAssignment.entrySet()) {
-final UUID client = assignmentEntry.getKey();
-final ClientState state = clientStates.get(client);
-for (final TaskId task : assignmentEntry.getValue()) {
-if (taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, 
client, tasksToCaughtUpClients)) {
-state.assignActive(task);
-} else {
-final TaskMovement taskMovement = new TaskMovement(task, 
client, tasksToCaughtUpClients.get(task));
-taskMovements.add(taskMovement);
+for (final Map.Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID client = clientStateEntry.getKey();
+final ClientState state = clientStateEntry.getValue();
+for (final TaskId task : state.activeTasks()) {
+// if the desired client is not caught up, and there is 
another client that _is_ caught up, then
+// we schedule a movement, so we can move the active task to 
the caught-up client. We'll try to
+// assign a warm-up to the desired client so that we can move 
it later on.
+if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, 
client, tasksToCaughtUpClients)) {
+taskMovements.add(new TaskMovement(task, client, 
tasksToCaughtUpClients.get(task)));
 }
 }
 clientsByTaskLoad.offer(client);
 }
 
+final boolean movementsNeeded = !taskMovements.isEmpty();
+
 final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(maxWarmupReplicas);
 for (final TaskMovement movement : taskMovements) {
-final UUID sourceClient = clientsByTaskLoad.poll(movement.task);
-if (sourceClient == null) {
-throw new IllegalStateException("Tried to move task to 
caught-up client but none exist");
-}
-
-final ClientState sourceClientState = 
clientStates.get(sourceClient);
-sourceClientState.assignActive(movement.task);
-clientsByTaskLoad.offer(sourceClient);
+final UUID standbySourceClient = clientsByTaskLoad.poll(

Review comment:
   Agreed, it would just be good luck right now, but I figured we might as 
well capitalize on the luck. I'm planning to follow up pretty soon with the 
standby stickiness.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please 

[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424819838



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -53,75 +67,94 @@ private static boolean 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(final Task
 /**
  * @return whether any warmup replicas were assigned
  */
-static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
+static boolean assignTaskMovements(final Map> 
tasksToCaughtUpClients,
final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
final int maxWarmupReplicas) {
-boolean warmupReplicasAssigned = false;
+final BiFunction caughtUpPredicate =
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
 
-final ValidClientsByTaskLoadQueue clientsByTaskLoad = new 
ValidClientsByTaskLoadQueue(
-clientStates,
-(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients)
+final ConstrainedPrioritySet clientsByTaskLoad = new 
ConstrainedPrioritySet(

Review comment:
   sure, that's a good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8588: KAFKA-6145: KIP-441: Improve assignment balance

2020-05-13 Thread GitBox


vvcephei commented on a change in pull request #8588:
URL: https://github.com/apache/kafka/pull/8588#discussion_r424718290



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##
@@ -75,4 +94,303 @@
 static UUID uuidForInt(final int n) {
 return new UUID(0, n);
 }
+
+static void assertValidAssignment(final int numStandbyReplicas,
+  final Set statefulTasks,
+  final Set statelessTasks,
+  final Map 
assignedStates,
+  final StringBuilder failureContext) {
+assertValidAssignment(
+numStandbyReplicas,
+0,
+statefulTasks,
+statelessTasks,
+assignedStates,
+failureContext
+);
+}
+
+static void assertValidAssignment(final int numStandbyReplicas,
+  final int maxWarmupReplicas,
+  final Set statefulTasks,
+  final Set statelessTasks,
+  final Map 
assignedStates,
+  final StringBuilder failureContext) {
+final Map> assignments = new TreeMap<>();
+for (final TaskId taskId : statefulTasks) {
+assignments.put(taskId, new TreeSet<>());
+}
+for (final TaskId taskId : statelessTasks) {
+assignments.put(taskId, new TreeSet<>());
+}
+for (final Map.Entry entry : 
assignedStates.entrySet()) {
+validateAndAddActiveAssignments(statefulTasks, statelessTasks, 
failureContext, assignments, entry);
+validateAndAddStandbyAssignments(statefulTasks, statelessTasks, 
failureContext, assignments, entry);
+}
+
+final AtomicInteger remainingWarmups = new 
AtomicInteger(maxWarmupReplicas);
+
+final TreeMap> misassigned =
+assignments
+.entrySet()
+.stream()
+.filter(entry -> {
+final int expectedActives = 1;
+final boolean isStateless = 
statelessTasks.contains(entry.getKey());
+final int expectedStandbys = isStateless ? 0 : 
numStandbyReplicas;
+// We'll never assign even the expected number of standbys 
if they don't actually fit in the cluster
+final int expectedAssignments = Math.min(
+assignedStates.size(),
+expectedActives + expectedStandbys
+);
+final int actualAssignments = entry.getValue().size();
+if (actualAssignments == expectedAssignments) {
+return false; // not misassigned
+} else {
+if (actualAssignments == expectedAssignments + 1 && 
remainingWarmups.get() > 0) {
+remainingWarmups.getAndDecrement();
+return false; // it's a warmup, so it's fine
+} else {
+return true; // misassigned
+}
+}
+})
+.collect(entriesToMap(TreeMap::new));
+
+if (!misassigned.isEmpty()) {

Review comment:
   L131-158 is just gathering the information about whether each task is 
correctly assigned or not, based on its type and the standby configs (and maybe 
the warmup config). It doesn't make any assertions. So this check is actually 
the assertion, that no tasks are incorrectly assigned.
   
   Doing it this way is nicer, since when it fails, it tells you _all_ the 
incorrectly assigned tasks, not just the first one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org