[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412424861 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; +import org.apache.kafka.streams.processor.TaskId; + +/** + * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment + */ +class ValidClientsByTaskLoadQueue { +private final PriorityQueue clientsByTaskLoad; +private final BiFunction validClientCriteria; +private final Set uniqueClients = new HashSet<>(); + +ValidClientsByTaskLoadQueue(final Map clientStates, +final BiFunction validClientCriteria) { +clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates); +this.validClientCriteria = validClientCriteria; +} + +/** += * @return the next least loaded client that satisfies the given criteria, or null if none do + */ +UUID poll(final TaskId task) { +final List validClient = poll(task, 1); +return validClient.isEmpty() ? null : validClient.get(0); +} + +/** + * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid + * candidates for the given task + */ +List poll(final TaskId task, final int numClients) { +final List nextLeastLoadedValidClients = new LinkedList<>(); +final Set invalidPolledClients = new HashSet<>(); +while (nextLeastLoadedValidClients.size() < numClients) { +UUID candidateClient; +while (true) { +candidateClient = clientsByTaskLoad.poll(); +if (candidateClient == null) { +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +if (validClientCriteria.apply(candidateClient, task)) { +nextLeastLoadedValidClients.add(candidateClient); +break; +} else { +invalidPolledClients.add(candidateClient); +} +} +} +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +void offerAll(final Collection clients) { +for (final UUID client : clients) { +offer(client); +} +} + +void offer(final UUID client) { +if (uniqueClients.contains(client)) { Review comment: @cadonna you're right, I forgot to remove from `uniqueClients` in poll. Good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411858555 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -1610,79 +1610,6 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN ))); } -@Test -public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() { Review comment: I don't mean to totally cop out on this, but I think we should do this in a followup PR. I'll make a ticket and assign it to myself for later so I can't escape, but I don't even think it's worth marking it `@Ignore` for now. Tbh we should have removed it a while ago, rather than changing it over time to become its useless self today. It's a long history, and I'm mostly responsible, but just looking ahead the question now is: what do we even want to validate? The task assignor has no knowledge of version probing, and the partition assignor is not responsible for the task assignment (whereas it used to be with version probing, hence this test). What we should do is validate the inputs are being assembled sensibly during version probing. Anyways this will be really difficult to do just based on the final partition assignment, and even harder to distinguish a real failure from an unrelated one. So I'd propose to kick this into the future, when we embed the actual assignor class in the configs instead of this flag, and then pass in a `VersionProbingClientStatesValidatingAssignor` or whatever...SG? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411858808 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java ## @@ -1610,79 +1610,6 @@ public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenN ))); } -@Test -public void shouldReturnNormalAssignmentForOldAndFutureInstancesDuringVersionProbing() { Review comment: Probably a much longer answer than you ever wanted, but this test has been haunting me over many PRs This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411841909 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); Review comment:
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411828397 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r41182 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); Review comment: Yeah I
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411826785 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411821268 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411815724 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); Review comment: I
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411814466 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411813832 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411813351 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411799045 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411797135 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411787822 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); Review comment: Well,
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411779358 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411743597 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411739644 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411738965 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r411734468 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -89,95 +88,72 @@ public boolean assign() { return false; } -final Map> warmupTaskAssignment = initializeEmptyTaskAssignmentMap(sortedClients); -final Map> standbyTaskAssignment = initializeEmptyTaskAssignmentMap(sortedClients); -final Map> statelessActiveTaskAssignment = initializeEmptyTaskAssignmentMap(sortedClients); +final Map tasksToRemainingStandbys = +statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> configs.numStandbyReplicas)); -// Stateful Active Tasks // +final boolean followupRebalanceNeeded = assignStatefulActiveTasks(tasksToRemainingStandbys); -final Map> statefulActiveTaskAssignment = -new DefaultStateConstrainedBalancedAssignor().assign( -statefulTasksToRankedCandidates, -configs.balanceFactor, -sortedClients, -clientsToNumberOfThreads, -tasksToCaughtUpClients -); +assignStandbyReplicaTasks(tasksToRemainingStandbys); + +assignStatelessActiveTasks(); -// Warmup Replica Tasks // +return followupRebalanceNeeded; +} -final Map> balancedStatefulActiveTaskAssignment = +private boolean assignStatefulActiveTasks(final Map tasksToRemainingStandbys) { +final Map> statefulActiveTaskAssignment = new DefaultBalancedAssignor().assign( sortedClients, statefulTasks, clientsToNumberOfThreads, configs.balanceFactor); -final Map tasksToRemainingStandbys = -statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> configs.numStandbyReplicas)); - -final List movements = getMovements( +return assignTaskMovements( statefulActiveTaskAssignment, -balancedStatefulActiveTaskAssignment, tasksToCaughtUpClients, clientStates, tasksToRemainingStandbys, -configs.maxWarmupReplicas); - -for (final TaskMovement movement : movements) { -warmupTaskAssignment.get(movement.destination).add(movement.task); -} - -// Standby Replica Tasks // - -final List>> allTaskAssignmentMaps = asList( -statefulActiveTaskAssignment, -warmupTaskAssignment, -standbyTaskAssignment, -statelessActiveTaskAssignment +configs.maxWarmupReplicas ); +} -final ValidClientsByTaskLoadQueue clientsByStandbyTaskLoad = -new ValidClientsByTaskLoadQueue<>( -getClientPriorityQueueByTaskLoad(allTaskAssignmentMaps), -allTaskAssignmentMaps +private void assignStandbyReplicaTasks(final Map tasksToRemainingStandbys) { +final ValidClientsByTaskLoadQueue standbyTaskClientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> !clientStates.get(client).assignedTasks().contains(task) ); +standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet()); for (final TaskId task : statefulTasksToRankedCandidates.keySet()) { final int numRemainingStandbys = tasksToRemainingStandbys.get(task); -final List clients = clientsByStandbyTaskLoad.poll(task, numRemainingStandbys); +final List clients = standbyTaskClientsByTaskLoad.poll(task, numRemainingStandbys); for (final UUID client : clients) { -standbyTaskAssignment.get(client).add(task); +clientStates.get(client).assignStandby(task); } -clientsByStandbyTaskLoad.offer(clients); +standbyTaskClientsByTaskLoad.offerAll(clients); + final int numStandbysAssigned = clients.size(); -if (numStandbysAssigned < configs.numStandbyReplicas) { +if (numStandbysAssigned < numRemainingStandbys) { log.warn("Unable to assign {} of {} standby tasks for task [{}]. " + "There is not enough available capacity. You should " + "increase the number of threads and/or application instances " + "to maintain the requested number of standby replicas.", -configs.numStandbyReplicas - numStandbysAssigned, configs.numStandbyReplicas, task); + numRemainingStandbys - numStandbysAssigned, configs.numStandbyReplicas, task);