ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411799045



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-    private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
     final TaskId task;
-    final UUID source;
-    final UUID destination;
+    private final UUID destination;
 
-    TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+    TaskMovement(final TaskId task, final UUID destination) {
         this.task = task;
-        this.source = source;
         this.destination = destination;
     }
 
-    @Override
-    public boolean equals(final Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        final TaskMovement movement = (TaskMovement) o;
-        return Objects.equals(task, movement.task) &&
-                   Objects.equals(source, movement.source) &&
-                   Objects.equals(destination, movement.destination);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(task, source, destination);
-    }
-
     /**
-     * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
-     * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
-     * a few exceptional cases:
-     * <p>
-     * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
-     * immediately from the source to the destination in the state constrained 
assignment
-     * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
-     * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
-     *
-     * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
-     * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
-     * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+     * @return whether any warmup replicas were assigned
      */
-    static List<TaskMovement> getMovements(final Map<UUID, List<TaskId>> 
statefulActiveTaskAssignment,
-                                           final Map<UUID, List<TaskId>> 
balancedStatefulActiveTaskAssignment,
-                                           final Map<TaskId, SortedSet<UUID>> 
tasksToCaughtUpClients,
-                                           final Map<UUID, ClientState> 
clientStates,
-                                           final Map<TaskId, Integer> 
tasksToRemainingStandbys,
-                                           final int maxWarmupReplicas) {
-        if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-            throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-        }
+    static boolean assignTaskMovements(final Map<UUID, List<TaskId>> 
statefulActiveTaskAssignment,
+                                       final Map<TaskId, SortedSet<UUID>> 
tasksToCaughtUpClients,
+                                       final Map<UUID, ClientState> 
clientStates,
+                                       final Map<TaskId, Integer> 
tasksToRemainingStandbys,
+                                       final int maxWarmupReplicas) {
+        boolean warmupReplicasAssigned = false;
+
+        final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+            new ValidClientsByTaskLoadQueue(
+                clientStates,
+                (client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+            );
 
-        final Map<TaskId, UUID> taskToDestinationClient = new HashMap<>();
-        for (final Map.Entry<UUID, List<TaskId>> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-            final UUID destination = clientEntry.getKey();
-            for (final TaskId task : clientEntry.getValue()) {
-                taskToDestinationClient.put(task, destination);
+        final SortedSet<TaskMovement> taskMovements = new TreeSet<>(
+            (movement, other) -> {
+                final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+                final int otherNumCaughtUpClients = 
tasksToCaughtUpClients.get(other.task).size();
+                if (numCaughtUpClients != otherNumCaughtUpClients) {
+                    return numCaughtUpClients - otherNumCaughtUpClients;
+                } else {
+                    return movement.task.compareTo(other.task);
+                }
             }
+        );
+
+        for (final Map.Entry<UUID, List<TaskId>> assignmentEntry : 
statefulActiveTaskAssignment.entrySet()) {
+            final UUID client = assignmentEntry.getKey();
+            final ClientState state = clientStates.get(client);
+            for (final TaskId task : assignmentEntry.getValue()) {
+                if (taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)) {
+                    state.assignActive(task);
+                } else {
+                    final TaskMovement taskMovement = new TaskMovement(task,  
client);
+                    taskMovements.add(taskMovement);
+                }
+            }
+            clientsByTaskLoad.offer(client);
         }
 
-        int remainingAllowedWarmupReplicas = maxWarmupReplicas;
-        final List<TaskMovement> movements = new LinkedList<>();
-        for (final Map.Entry<UUID, List<TaskId>> sourceClientEntry : 
statefulActiveTaskAssignment.entrySet()) {
-            final UUID source = sourceClientEntry.getKey();
+        int remainingWarmupReplicas = maxWarmupReplicas;
+        for (final TaskMovement movement : taskMovements) {
+            final UUID leastLoadedClient = 
clientsByTaskLoad.poll(movement.task);
+            if (leastLoadedClient == null) {
+                throw new IllegalStateException("Tried to move task to 
caught-up client but none exist");

Review comment:
       I'm having trouble thinking of a way to make the exception message more 
clear without just starting to name variable names and describe the literal 
code, which doesn't seem appropriate for an exception message. Does anyone have 
any suggestions? Maybe something like `scheduled a task to be moved and 
assigned to a caught-up client, but no caught up clients were found`? That just 
seems like a more verbose way to say the same thing 😕 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to