ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411779358
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ########## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { - private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; - final UUID source; - final UUID destination; + private final UUID destination; - TaskMovement(final TaskId task, final UUID source, final UUID destination) { + TaskMovement(final TaskId task, final UUID destination) { this.task = task; - this.source = source; this.destination = destination; } - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final TaskMovement movement = (TaskMovement) o; - return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); - } - - @Override - public int hashCode() { - return Objects.hash(task, source, destination); - } - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * <p> - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ - static List<TaskMovement> getMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment, - final Map<UUID, List<TaskId>> balancedStatefulActiveTaskAssignment, - final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients, - final Map<UUID, ClientState> clientStates, - final Map<TaskId, Integer> tasksToRemainingStandbys, - final int maxWarmupReplicas) { - if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { - throw new IllegalStateException("Tried to compute movements but assignments differ in size."); - } + static boolean assignTaskMovements(final Map<UUID, List<TaskId>> statefulActiveTaskAssignment, + final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients, + final Map<UUID, ClientState> clientStates, + final Map<TaskId, Integer> tasksToRemainingStandbys, + final int maxWarmupReplicas) { + boolean warmupReplicasAssigned = false; + + final ValidClientsByTaskLoadQueue clientsByTaskLoad = + new ValidClientsByTaskLoadQueue( + clientStates, + (client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) + ); - final Map<TaskId, UUID> taskToDestinationClient = new HashMap<>(); - for (final Map.Entry<UUID, List<TaskId>> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { - final UUID destination = clientEntry.getKey(); - for (final TaskId task : clientEntry.getValue()) { - taskToDestinationClient.put(task, destination); + final SortedSet<TaskMovement> taskMovements = new TreeSet<>( + (movement, other) -> { + final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); + final int otherNumCaughtUpClients = tasksToCaughtUpClients.get(other.task).size(); + if (numCaughtUpClients != otherNumCaughtUpClients) { + return numCaughtUpClients - otherNumCaughtUpClients; + } else { + return movement.task.compareTo(other.task); + } } + ); + + for (final Map.Entry<UUID, List<TaskId>> assignmentEntry : statefulActiveTaskAssignment.entrySet()) { + final UUID client = assignmentEntry.getKey(); + final ClientState state = clientStates.get(client); + for (final TaskId task : assignmentEntry.getValue()) { + if (taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients)) { + state.assignActive(task); + } else { + final TaskMovement taskMovement = new TaskMovement(task, client); + taskMovements.add(taskMovement); + } + } + clientsByTaskLoad.offer(client); } - int remainingAllowedWarmupReplicas = maxWarmupReplicas; - final List<TaskMovement> movements = new LinkedList<>(); - for (final Map.Entry<UUID, List<TaskId>> sourceClientEntry : statefulActiveTaskAssignment.entrySet()) { - final UUID source = sourceClientEntry.getKey(); + int remainingWarmupReplicas = maxWarmupReplicas; + for (final TaskMovement movement : taskMovements) { + final UUID leastLoadedClient = clientsByTaskLoad.poll(movement.task); + if (leastLoadedClient == null) { + throw new IllegalStateException("Tried to move task to caught-up client but none exist"); + } - final Iterator<TaskId> sourceClientTasksIterator = sourceClientEntry.getValue().iterator(); - while (sourceClientTasksIterator.hasNext()) { - final TaskId task = sourceClientTasksIterator.next(); - final UUID destination = taskToDestinationClient.get(task); - if (destination == null) { - log.error("Task {} is assigned to client {} in initial assignment but has no owner in the final " + - "balanced assignment.", task, source); - throw new IllegalStateException("Found task in initial assignment that was not assigned in the final."); - } else if (!source.equals(destination)) { - if (destinationClientIsCaughtUp(task, destination, tasksToCaughtUpClients)) { - sourceClientTasksIterator.remove(); - statefulActiveTaskAssignment.get(destination).add(task); - } else { - if (clientStates.get(destination).prevStandbyTasks().contains(task) - && tasksToRemainingStandbys.get(task) > 0 - ) { - decrementRemainingStandbys(task, tasksToRemainingStandbys); - } else { - --remainingAllowedWarmupReplicas; - } + final ClientState sourceClientState = clientStates.get(leastLoadedClient); + sourceClientState.assignActive(movement.task); - movements.add(new TaskMovement(task, source, destination)); - if (remainingAllowedWarmupReplicas == 0) { - return movements; - } - } - } + final ClientState destinationClientState = clientStates.get(movement.destination); + if (destinationClientState.prevStandbyTasks().contains(movement.task) && tasksToRemainingStandbys.get(movement.task) > 0) { + decrementRemainingStandbys(movement.task, tasksToRemainingStandbys); + destinationClientState.assignStandby(movement.task); + warmupReplicasAssigned = true; + } else if (remainingWarmupReplicas > 0) { + --remainingWarmupReplicas; + destinationClientState.assignStandby(movement.task); + warmupReplicasAssigned = true; } Review comment: Hm. I'm not sure this was what you meant, but I think we have to interpret "`wasn't previously assigned to any client"` as meaning `"any client other than the source client"`. By definition someone had to have had this task previously for it to be involved in a movement: of course this could be due to leftover/old state and not the actual assigned task, but we currently don't and can't distinguish these. Given that, I think I buy this. But should the condition also be generalized to "the task had fewer than num.standby previous clients" (after accounting for the above)? Just to lay out the general reasoning for my future self: we basically only want to count something as a warmup replica when counting it against the total standbys for this rebalance(s) would mean temporarily revoking a standby task from some client for the duration of the rebalance(s). In other words, outside of the tasks & clients involved in a movement, we want the assignment during the intermediate rebalances to resemble the final assignment as much as possible. So if we had num.standbys = 5 and 4 previous clients I think we would still want to consider this a standby. Of course, nothing discussed here will really matter unless (until) we make the standby assignment more sticky and/or lag-based ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org