ableegoldman commented on code in PR #16052: URL: https://github.com/apache/kafka/pull/16052#discussion_r1612714758
########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { Review Comment: Already discussed but just to keep track: remove this override (and add a default to interface method if necessary) ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { Review Comment: The build is failing, I think because this method doesn't exist. I thought we were going to add a `#hasCompleteRackInfo` API or something like that to the `ApplicationState`? ie something equivalent to the `rackAwareTaskAssignor.canEnableRackAwareAssignor()` API? ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForStatefulTasks); + } + + private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { + if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) { + return; + } + + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( + applicationState, currentAssignments); + assignmentState.processOptimizedAssignments(optimizedAssignments); + } + + private static void assignActive(final ApplicationState applicationState, + final Collection<KafkaStreamsState> clients, + final AssignmentState assignmentState, + final boolean mustPreserveActiveTaskAssignment) { + final int totalCapacity = computeStreamThreadCount(clients); + if (totalCapacity == 0) { + throw new IllegalStateException("`totalCapacity` should never be zero."); + } + + final Set<TaskId> allTaskIds = applicationState.allTasks().stream() + .map(TaskInfo::id).collect(Collectors.toSet()); + final int taskCount = allTaskIds.size(); + final int activeTasksPerThread = taskCount / totalCapacity; + final Set<TaskId> unassigned = new HashSet<>(allTaskIds); + + // first try and re-assign existing active tasks to clients that previously had + // the same active task + for (final TaskId taskId : assignmentState.previousActiveAssignment.keySet()) { + final ProcessId previousClientForTask = assignmentState.previousActiveAssignment.get(taskId); + + // TODO: Remove this check, why is it even necessary? Review Comment: reminder: remove this comment ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForStatefulTasks); + } + + private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { + if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) { + return; + } + + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( + applicationState, currentAssignments); + assignmentState.processOptimizedAssignments(optimizedAssignments); + } + + private static void assignActive(final ApplicationState applicationState, + final Collection<KafkaStreamsState> clients, + final AssignmentState assignmentState, + final boolean mustPreserveActiveTaskAssignment) { + final int totalCapacity = computeStreamThreadCount(clients); + if (totalCapacity == 0) { + throw new IllegalStateException("`totalCapacity` should never be zero."); + } + + final Set<TaskId> allTaskIds = applicationState.allTasks().stream() + .map(TaskInfo::id).collect(Collectors.toSet()); + final int taskCount = allTaskIds.size(); + final int activeTasksPerThread = taskCount / totalCapacity; + final Set<TaskId> unassigned = new HashSet<>(allTaskIds); + + // first try and re-assign existing active tasks to clients that previously had + // the same active task + for (final TaskId taskId : assignmentState.previousActiveAssignment.keySet()) { + final ProcessId previousClientForTask = assignmentState.previousActiveAssignment.get(taskId); + + // TODO: Remove this check, why is it even necessary? + if (allTaskIds.contains(taskId)) { + if (mustPreserveActiveTaskAssignment || assignmentState.hasRoomForActiveTask(previousClientForTask, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, previousClientForTask, AssignedTask.Type.ACTIVE); + unassigned.remove(taskId); + } + } + } + + // try and assign any remaining unassigned tasks to clients that previously + // have seen the task. + for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext(); ) { + final TaskId taskId = iterator.next(); + final Set<ProcessId> previousClientsForStandbyTask = assignmentState.previousStandbyAssignment.get(taskId); + if (previousClientsForStandbyTask == null || previousClientsForStandbyTask.isEmpty()) { + continue; + } + + for (final ProcessId client: previousClientsForStandbyTask) { + if (assignmentState.hasRoomForActiveTask(client, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, client, AssignedTask.Type.ACTIVE); + iterator.remove(); + break; + } + } + } + + // assign any remaining unassigned tasks + final List<TaskId> sortedTasks = new ArrayList<>(unassigned); + Collections.sort(sortedTasks); + for (final TaskId taskId : sortedTasks) { + final Set<ProcessId> candidateClients = clients.stream() + .map(KafkaStreamsState::processId) + .collect(Collectors.toSet()); + final ProcessId bestClient = assignmentState.findBestClientForTask(taskId, candidateClients); + assignmentState.finalizeAssignment(taskId, bestClient, AssignedTask.Type.ACTIVE); + } + } + + private static void assignStandby(final ApplicationState applicationState, + final AssignmentState assignmentState) { + final Set<TaskInfo> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .collect(Collectors.toSet()); + final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); + for (final TaskInfo task : statefulTasks) { + for (int i = 0; i < numStandbyReplicas; i++) { + final Set<ProcessId> candidateClients = assignmentState.findClientsWithoutAssignedTask(task.id()); + if (candidateClients.isEmpty()) { + LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. " + + "There is not enough available capacity. You should " + + "increase the number of threads and/or application instances " + + "to maintain the requested number of standby replicas.", + numStandbyReplicas - i, + numStandbyReplicas, task.id()); + break; + } + + final ProcessId bestClient = assignmentState.findBestClientForTask(task.id(), candidateClients); + assignmentState.finalizeAssignment(task.id(), bestClient, AssignedTask.Type.STANDBY); + } + } + } + + private static Map<TaskId, ProcessId> mapPreviousActiveTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, ProcessId> previousActiveTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousActiveTasks.put(taskId, client.processId()); + } + } + return previousActiveTasks; + } + + private static Map<TaskId, Set<ProcessId>> mapPreviousStandbyTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, Set<ProcessId>> previousStandbyTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousStandbyTasks.computeIfAbsent(taskId, k -> new HashSet<>()); + previousStandbyTasks.get(taskId).add(client.processId()); + } + } + return previousStandbyTasks; + } + + private static int computeStreamThreadCount(final Collection<KafkaStreamsState> clients) { + int count = 0; + for (final KafkaStreamsState client : clients) { + count += client.numProcessingThreads(); + } + return count; + } + + private static class AssignmentState { + private final Map<ProcessId, KafkaStreamsState> clients; + private final Map<TaskId, ProcessId> previousActiveAssignment; + private final Map<TaskId, Set<ProcessId>> previousStandbyAssignment; + + private final TaskPairs taskPairs; + + private Map<TaskId, Set<ProcessId>> newTaskLocations; + private Map<ProcessId, Set<AssignedTask>> newAssignments; + + private AssignmentState(final ApplicationState applicationState, + final Map<ProcessId, KafkaStreamsState> clients, + final Map<TaskId, ProcessId> previousActiveAssignment, + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment) { + this.clients = clients; + this.previousActiveAssignment = unmodifiableMap(previousActiveAssignment); + this.previousStandbyAssignment = unmodifiableMap(previousStandbyAssignment); + + final int taskCount = applicationState.allTasks().size(); + final int maxPairs = taskCount * (taskCount - 1) / 2; + this.taskPairs = new TaskPairs(maxPairs); + + this.newTaskLocations = new HashMap<>(); + this.newAssignments = new HashMap<>(); + } + + public void finalizeAssignment(final TaskId taskId, final ProcessId client, final AssignedTask.Type type) { + newAssignments.computeIfAbsent(client, k -> new HashSet<>()); + newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>()); + + final Set<TaskId> newAssignmentsForClient = newAssignments.get(client) + .stream().map(AssignedTask::id).collect(Collectors.toSet()); + + taskPairs.addPairs(taskId, newAssignmentsForClient); + newAssignments.get(client).add(new AssignedTask(taskId, type)); + newTaskLocations.get(taskId).add(client); + } + + public Map<ProcessId, KafkaStreamsAssignment> buildKafkaStreamsAssignments() { + final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments = new HashMap<>(); + for (final ProcessId processId : newAssignments.keySet()) { + final Set<AssignedTask> assignedTasks = newAssignments.get(processId); + final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of(processId, assignedTasks); + // TODO: Followup rebalance? Review Comment: just to confirm what we discussed already: we should not schedule a followup rebalance if `mustPreserveActiveTaskAssignment` is false, if it is true then we should just pick one of the the `KafkaStreamsAssignments` (essentially at random) and schedule a followup rebalance for 0ms so it gets triggered immediately. We can just do this at the end, after assigning all the tasks. ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForStatefulTasks); + } + + private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { + if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) { + return; + } + + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( + applicationState, currentAssignments); + assignmentState.processOptimizedAssignments(optimizedAssignments); + } + + private static void assignActive(final ApplicationState applicationState, + final Collection<KafkaStreamsState> clients, + final AssignmentState assignmentState, + final boolean mustPreserveActiveTaskAssignment) { + final int totalCapacity = computeStreamThreadCount(clients); + if (totalCapacity == 0) { + throw new IllegalStateException("`totalCapacity` should never be zero."); + } + + final Set<TaskId> allTaskIds = applicationState.allTasks().stream() + .map(TaskInfo::id).collect(Collectors.toSet()); + final int taskCount = allTaskIds.size(); + final int activeTasksPerThread = taskCount / totalCapacity; + final Set<TaskId> unassigned = new HashSet<>(allTaskIds); + + // first try and re-assign existing active tasks to clients that previously had + // the same active task + for (final TaskId taskId : assignmentState.previousActiveAssignment.keySet()) { + final ProcessId previousClientForTask = assignmentState.previousActiveAssignment.get(taskId); + + // TODO: Remove this check, why is it even necessary? + if (allTaskIds.contains(taskId)) { + if (mustPreserveActiveTaskAssignment || assignmentState.hasRoomForActiveTask(previousClientForTask, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, previousClientForTask, AssignedTask.Type.ACTIVE); + unassigned.remove(taskId); + } + } + } + + // try and assign any remaining unassigned tasks to clients that previously + // have seen the task. + for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext(); ) { + final TaskId taskId = iterator.next(); + final Set<ProcessId> previousClientsForStandbyTask = assignmentState.previousStandbyAssignment.get(taskId); + if (previousClientsForStandbyTask == null || previousClientsForStandbyTask.isEmpty()) { + continue; + } + + for (final ProcessId client: previousClientsForStandbyTask) { + if (assignmentState.hasRoomForActiveTask(client, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, client, AssignedTask.Type.ACTIVE); + iterator.remove(); + break; + } + } + } + + // assign any remaining unassigned tasks + final List<TaskId> sortedTasks = new ArrayList<>(unassigned); + Collections.sort(sortedTasks); + for (final TaskId taskId : sortedTasks) { + final Set<ProcessId> candidateClients = clients.stream() + .map(KafkaStreamsState::processId) + .collect(Collectors.toSet()); + final ProcessId bestClient = assignmentState.findBestClientForTask(taskId, candidateClients); + assignmentState.finalizeAssignment(taskId, bestClient, AssignedTask.Type.ACTIVE); + } + } + + private static void assignStandby(final ApplicationState applicationState, + final AssignmentState assignmentState) { + final Set<TaskInfo> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .collect(Collectors.toSet()); + final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); + for (final TaskInfo task : statefulTasks) { + for (int i = 0; i < numStandbyReplicas; i++) { + final Set<ProcessId> candidateClients = assignmentState.findClientsWithoutAssignedTask(task.id()); + if (candidateClients.isEmpty()) { + LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. " + + "There is not enough available capacity. You should " + + "increase the number of threads and/or application instances " + + "to maintain the requested number of standby replicas.", + numStandbyReplicas - i, + numStandbyReplicas, task.id()); + break; + } + + final ProcessId bestClient = assignmentState.findBestClientForTask(task.id(), candidateClients); + assignmentState.finalizeAssignment(task.id(), bestClient, AssignedTask.Type.STANDBY); + } + } + } + + private static Map<TaskId, ProcessId> mapPreviousActiveTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, ProcessId> previousActiveTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousActiveTasks.put(taskId, client.processId()); + } + } + return previousActiveTasks; + } + + private static Map<TaskId, Set<ProcessId>> mapPreviousStandbyTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, Set<ProcessId>> previousStandbyTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousStandbyTasks.computeIfAbsent(taskId, k -> new HashSet<>()); + previousStandbyTasks.get(taskId).add(client.processId()); + } + } + return previousStandbyTasks; + } + + private static int computeStreamThreadCount(final Collection<KafkaStreamsState> clients) { + int count = 0; + for (final KafkaStreamsState client : clients) { + count += client.numProcessingThreads(); + } + return count; + } + + private static class AssignmentState { + private final Map<ProcessId, KafkaStreamsState> clients; + private final Map<TaskId, ProcessId> previousActiveAssignment; + private final Map<TaskId, Set<ProcessId>> previousStandbyAssignment; + + private final TaskPairs taskPairs; + + private Map<TaskId, Set<ProcessId>> newTaskLocations; + private Map<ProcessId, Set<AssignedTask>> newAssignments; + + private AssignmentState(final ApplicationState applicationState, + final Map<ProcessId, KafkaStreamsState> clients, + final Map<TaskId, ProcessId> previousActiveAssignment, + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment) { + this.clients = clients; + this.previousActiveAssignment = unmodifiableMap(previousActiveAssignment); + this.previousStandbyAssignment = unmodifiableMap(previousStandbyAssignment); + + final int taskCount = applicationState.allTasks().size(); + final int maxPairs = taskCount * (taskCount - 1) / 2; + this.taskPairs = new TaskPairs(maxPairs); + + this.newTaskLocations = new HashMap<>(); + this.newAssignments = new HashMap<>(); + } + + public void finalizeAssignment(final TaskId taskId, final ProcessId client, final AssignedTask.Type type) { + newAssignments.computeIfAbsent(client, k -> new HashSet<>()); + newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>()); + + final Set<TaskId> newAssignmentsForClient = newAssignments.get(client) + .stream().map(AssignedTask::id).collect(Collectors.toSet()); + + taskPairs.addPairs(taskId, newAssignmentsForClient); + newAssignments.get(client).add(new AssignedTask(taskId, type)); + newTaskLocations.get(taskId).add(client); + } + + public Map<ProcessId, KafkaStreamsAssignment> buildKafkaStreamsAssignments() { + final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments = new HashMap<>(); + for (final ProcessId processId : newAssignments.keySet()) { + final Set<AssignedTask> assignedTasks = newAssignments.get(processId); + final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of(processId, assignedTasks); + // TODO: Followup rebalance? + kafkaStreamsAssignments.put(processId, assignment); + } + return kafkaStreamsAssignments; + } + + public void processOptimizedAssignments(final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments) { + final Map<TaskId, Set<ProcessId>> newTaskLocations = new HashMap<>(); + final Map<ProcessId, Set<AssignedTask>> newAssignments = new HashMap<>(); + + for (final ProcessId processId : optimizedAssignments.keySet()) { + final Set<AssignedTask> assignedTasks = optimizedAssignments.get(processId).assignment(); + newAssignments.put(processId, assignedTasks); + + for (final AssignedTask task : assignedTasks) { + newTaskLocations.computeIfAbsent(task.id(), k -> new HashSet<>()); + newTaskLocations.get(task.id()).add(processId); + } + } + + this.newTaskLocations = newTaskLocations; + this.newAssignments = newAssignments; + } + + public boolean hasRoomForActiveTask(final ProcessId processId, final int activeTasksPerThread) { + final int capacity = clients.get(processId).numProcessingThreads(); + final int newActiveTaskCount = newAssignments.computeIfAbsent(processId, k -> new HashSet<>()) + .stream().filter(assignedTask -> assignedTask.type() == AssignedTask.Type.ACTIVE) + .collect(Collectors.toSet()) + .size(); + return newActiveTaskCount < capacity * activeTasksPerThread; + } + + public ProcessId findBestClientForTask(final TaskId taskId, final Set<ProcessId> clientsWithin) { + if (clientsWithin.size() == 1) { + return clientsWithin.iterator().next(); + } + + final ProcessId previousClient = findLeastLoadedClientWithPreviousActiveOrStandbyTask( + taskId, clientsWithin); + if (previousClient == null) { + return findLeastLoadedClient(taskId, clientsWithin); + } + + if (shouldBalanceLoad(previousClient)) { + final ProcessId standby = findLeastLoadedClientWithPreviousStandbyTask(taskId, clientsWithin); + if (standby == null || shouldBalanceLoad(standby)) { + return findLeastLoadedClient(taskId, clientsWithin); + } + return standby; + } + return previousClient; + } + + public Set<ProcessId> findClientsWithoutAssignedTask(final TaskId taskId) { + final Set<ProcessId> unavailableClients = newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>()); + return clients.values().stream() + .map(KafkaStreamsState::processId) + .filter(o -> !unavailableClients.contains(o)) + .collect(Collectors.toSet()); + } + + public double clientLoad(final ProcessId processId) { + final int capacity = clients.get(processId).numProcessingThreads(); + final double totalTaskCount = newAssignments.getOrDefault(processId, new HashSet<>()).size(); + return totalTaskCount / capacity; + } + + public ProcessId findLeastLoadedClient(final TaskId taskId, final Set<ProcessId> clientIds) { + ProcessId leastLoaded = null; + for (final ProcessId processId : clientIds) { + final double thisClientLoad = clientLoad(processId); + if (thisClientLoad == 0) { + return processId; + } + + if (leastLoaded == null || thisClientLoad < clientLoad(leastLoaded)) { + final Set<TaskId> assignedTasks = newAssignments.getOrDefault(processId, new HashSet<>()) + .stream().map(AssignedTask::id).collect(Collectors.toSet()); + if (taskPairs.hasNewPair(taskId, assignedTasks)) { + leastLoaded = processId; + } + } + } + + if (leastLoaded != null) { + return leastLoaded; + } + + for (final ProcessId processId : clientIds) { + final double thisClientLoad = clientLoad(processId); + + if (leastLoaded == null || thisClientLoad < clientLoad(leastLoaded)) { + leastLoaded = processId; + } + } + + return leastLoaded; + } + + public ProcessId findLeastLoadedClientWithPreviousActiveOrStandbyTask(final TaskId taskId, + final Set<ProcessId> clientsWithin) { + final ProcessId previous = previousActiveAssignment.get(taskId); + if (previous != null && clientsWithin.contains(previous)) { + return previous; + } + return findLeastLoadedClientWithPreviousStandbyTask(taskId, clientsWithin); + } + + public ProcessId findLeastLoadedClientWithPreviousStandbyTask(final TaskId taskId, + final Set<ProcessId> clientsWithin) { + final Set<ProcessId> ids = previousStandbyAssignment.getOrDefault(taskId, new HashSet<>()); + final HashSet<ProcessId> constrainTo = new HashSet<>(ids); + constrainTo.retainAll(clientsWithin); + return findLeastLoadedClient(taskId, constrainTo); + } + + public boolean shouldBalanceLoad(final ProcessId client) { + if (clientLoad(client) < 1) { + return false; + } + + final double thisClientLoad = clientLoad(client); Review Comment: can we declare this above so we can use it in the `if (clientLoad(client) < 1) {` line as well? ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() Review Comment: We're computing the set of stateful tasks at least twice, so maybe it makes sense to save this info and pass it through (eg via the AssignmentState) ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForStatefulTasks); + } + + private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { + if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) { + return; + } + + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( + applicationState, currentAssignments); + assignmentState.processOptimizedAssignments(optimizedAssignments); + } + + private static void assignActive(final ApplicationState applicationState, + final Collection<KafkaStreamsState> clients, + final AssignmentState assignmentState, + final boolean mustPreserveActiveTaskAssignment) { + final int totalCapacity = computeStreamThreadCount(clients); + if (totalCapacity == 0) { + throw new IllegalStateException("`totalCapacity` should never be zero."); + } + + final Set<TaskId> allTaskIds = applicationState.allTasks().stream() + .map(TaskInfo::id).collect(Collectors.toSet()); + final int taskCount = allTaskIds.size(); + final int activeTasksPerThread = taskCount / totalCapacity; + final Set<TaskId> unassigned = new HashSet<>(allTaskIds); + + // first try and re-assign existing active tasks to clients that previously had + // the same active task + for (final TaskId taskId : assignmentState.previousActiveAssignment.keySet()) { + final ProcessId previousClientForTask = assignmentState.previousActiveAssignment.get(taskId); + + // TODO: Remove this check, why is it even necessary? + if (allTaskIds.contains(taskId)) { + if (mustPreserveActiveTaskAssignment || assignmentState.hasRoomForActiveTask(previousClientForTask, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, previousClientForTask, AssignedTask.Type.ACTIVE); + unassigned.remove(taskId); + } + } + } + + // try and assign any remaining unassigned tasks to clients that previously + // have seen the task. + for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext(); ) { + final TaskId taskId = iterator.next(); + final Set<ProcessId> previousClientsForStandbyTask = assignmentState.previousStandbyAssignment.get(taskId); + if (previousClientsForStandbyTask == null || previousClientsForStandbyTask.isEmpty()) { + continue; + } Review Comment: nit: can we try and avoid this style of conditional early-exit, at least inside loops? I found the original version of this code a little clearer, it's easier to follow the logic if it's all in one block (and the previousClientsForStandbyTask.isEmpty() check does nothing since it won't enter the for loop anyways if the set is empty) ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForStatefulTasks); Review Comment: ```suggestion assignmentState.processOptimizedAssignments(optimizedAssignmentsForAllTasks); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { Review Comment: Also, both `#optimizeActive` and `#optimizeStandby` should return/do nothing if `mustPreserveActiveTaskAssignment == true`. So this check (and the one for the standby optimization) should be something like ``` if (!applicationState#hasCompleteRackInfo() || mustPreserveActiveTaskAssignment) { return; } ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/assignment/assignors/StickyTaskAssignor.java: ########## @@ -0,0 +1,478 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.assignment.assignors; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.assignment.ApplicationState; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment.AssignedTask; +import org.apache.kafka.streams.processor.assignment.KafkaStreamsState; +import org.apache.kafka.streams.processor.assignment.ProcessId; +import org.apache.kafka.streams.processor.assignment.TaskAssignmentUtils; +import org.apache.kafka.streams.processor.assignment.TaskAssignor; +import org.apache.kafka.streams.processor.assignment.TaskInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class StickyTaskAssignor implements TaskAssignor { + private static final Logger LOG = LoggerFactory.getLogger(StickyTaskAssignor.class); + + private Map<String, ?> configs = new HashMap<>(); + private final boolean mustPreserveActiveTaskAssignment; + + public StickyTaskAssignor() { + this(false); + } + + public StickyTaskAssignor(final boolean mustPreserveActiveTaskAssignment) { + this.mustPreserveActiveTaskAssignment = mustPreserveActiveTaskAssignment; + } + + @Override + public void configure(final Map<String, ?> configs) { + // TODO: The application state already has the assignment configs object, why should this + // assignor be configurable? + this.configs = configs; + } + + @Override + public TaskAssignment assign(final ApplicationState applicationState) { + final int taskCount = applicationState.allTasks().size(); + if (taskCount == 0) { + return new TaskAssignment(new ArrayList<>()); + } + + final Map<ProcessId, KafkaStreamsState> clients = applicationState.kafkaStreamsStates(false); + final Map<TaskId, ProcessId> previousActiveAssignment = mapPreviousActiveTasks(clients); + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment = mapPreviousStandbyTasks(clients); + final AssignmentState assignmentState = new AssignmentState(applicationState, clients, + previousActiveAssignment, previousStandbyAssignment); + + assignActive(applicationState, clients.values(), assignmentState, this.mustPreserveActiveTaskAssignment); + optimizeActive(applicationState, assignmentState); + assignStandby(applicationState, assignmentState); + optimizeStandby(applicationState, assignmentState); + return null; + } + + private void optimizeActive(final ApplicationState applicationState, + final AssignmentState assignmentState) { + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + + final Set<TaskId> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForStatefulTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, currentAssignments, new TreeSet<>(statefulTasks)); + + final Set<TaskId> statelessTasks = applicationState.allTasks().stream() + .filter(task -> !task.isStateful()) + .map(TaskInfo::id) + .collect(Collectors.toSet()); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignmentsForAllTasks = TaskAssignmentUtils.optimizeRackAwareActiveTasks( + applicationState, optimizedAssignmentsForStatefulTasks, new TreeSet<>(statelessTasks)); + + assignmentState.processOptimizedAssignments(optimizedAssignmentsForStatefulTasks); + } + + private void optimizeStandby(final ApplicationState applicationState, final AssignmentState assignmentState) { + if (applicationState.assignmentConfigs().numStandbyReplicas() <= 0) { + return; + } + + if (!TaskAssignmentUtils.hasValidRackInformation(applicationState)) { + return; + } + + final Map<ProcessId, KafkaStreamsAssignment> currentAssignments = assignmentState.buildKafkaStreamsAssignments(); + final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments = TaskAssignmentUtils.optimizeRackAwareStandbyTasks( + applicationState, currentAssignments); + assignmentState.processOptimizedAssignments(optimizedAssignments); + } + + private static void assignActive(final ApplicationState applicationState, + final Collection<KafkaStreamsState> clients, + final AssignmentState assignmentState, + final boolean mustPreserveActiveTaskAssignment) { + final int totalCapacity = computeStreamThreadCount(clients); + if (totalCapacity == 0) { + throw new IllegalStateException("`totalCapacity` should never be zero."); + } + + final Set<TaskId> allTaskIds = applicationState.allTasks().stream() + .map(TaskInfo::id).collect(Collectors.toSet()); + final int taskCount = allTaskIds.size(); + final int activeTasksPerThread = taskCount / totalCapacity; + final Set<TaskId> unassigned = new HashSet<>(allTaskIds); + + // first try and re-assign existing active tasks to clients that previously had + // the same active task + for (final TaskId taskId : assignmentState.previousActiveAssignment.keySet()) { + final ProcessId previousClientForTask = assignmentState.previousActiveAssignment.get(taskId); + + // TODO: Remove this check, why is it even necessary? + if (allTaskIds.contains(taskId)) { + if (mustPreserveActiveTaskAssignment || assignmentState.hasRoomForActiveTask(previousClientForTask, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, previousClientForTask, AssignedTask.Type.ACTIVE); + unassigned.remove(taskId); + } + } + } + + // try and assign any remaining unassigned tasks to clients that previously + // have seen the task. + for (final Iterator<TaskId> iterator = unassigned.iterator(); iterator.hasNext(); ) { + final TaskId taskId = iterator.next(); + final Set<ProcessId> previousClientsForStandbyTask = assignmentState.previousStandbyAssignment.get(taskId); + if (previousClientsForStandbyTask == null || previousClientsForStandbyTask.isEmpty()) { + continue; + } + + for (final ProcessId client: previousClientsForStandbyTask) { + if (assignmentState.hasRoomForActiveTask(client, activeTasksPerThread)) { + assignmentState.finalizeAssignment(taskId, client, AssignedTask.Type.ACTIVE); + iterator.remove(); + break; + } + } + } + + // assign any remaining unassigned tasks + final List<TaskId> sortedTasks = new ArrayList<>(unassigned); + Collections.sort(sortedTasks); + for (final TaskId taskId : sortedTasks) { + final Set<ProcessId> candidateClients = clients.stream() + .map(KafkaStreamsState::processId) + .collect(Collectors.toSet()); + final ProcessId bestClient = assignmentState.findBestClientForTask(taskId, candidateClients); + assignmentState.finalizeAssignment(taskId, bestClient, AssignedTask.Type.ACTIVE); + } + } + + private static void assignStandby(final ApplicationState applicationState, + final AssignmentState assignmentState) { + final Set<TaskInfo> statefulTasks = applicationState.allTasks().stream() + .filter(TaskInfo::isStateful) + .collect(Collectors.toSet()); + final int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas(); + for (final TaskInfo task : statefulTasks) { + for (int i = 0; i < numStandbyReplicas; i++) { + final Set<ProcessId> candidateClients = assignmentState.findClientsWithoutAssignedTask(task.id()); + if (candidateClients.isEmpty()) { + LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. " + + "There is not enough available capacity. You should " + + "increase the number of threads and/or application instances " + + "to maintain the requested number of standby replicas.", + numStandbyReplicas - i, + numStandbyReplicas, task.id()); + break; + } + + final ProcessId bestClient = assignmentState.findBestClientForTask(task.id(), candidateClients); + assignmentState.finalizeAssignment(task.id(), bestClient, AssignedTask.Type.STANDBY); + } + } + } + + private static Map<TaskId, ProcessId> mapPreviousActiveTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, ProcessId> previousActiveTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousActiveTasks.put(taskId, client.processId()); + } + } + return previousActiveTasks; + } + + private static Map<TaskId, Set<ProcessId>> mapPreviousStandbyTasks(final Map<ProcessId, KafkaStreamsState> clients) { + final Map<TaskId, Set<ProcessId>> previousStandbyTasks = new HashMap<>(); + for (final KafkaStreamsState client : clients.values()) { + for (final TaskId taskId : client.previousActiveTasks()) { + previousStandbyTasks.computeIfAbsent(taskId, k -> new HashSet<>()); + previousStandbyTasks.get(taskId).add(client.processId()); + } + } + return previousStandbyTasks; + } + + private static int computeStreamThreadCount(final Collection<KafkaStreamsState> clients) { + int count = 0; + for (final KafkaStreamsState client : clients) { + count += client.numProcessingThreads(); + } + return count; + } + + private static class AssignmentState { + private final Map<ProcessId, KafkaStreamsState> clients; + private final Map<TaskId, ProcessId> previousActiveAssignment; + private final Map<TaskId, Set<ProcessId>> previousStandbyAssignment; + + private final TaskPairs taskPairs; + + private Map<TaskId, Set<ProcessId>> newTaskLocations; + private Map<ProcessId, Set<AssignedTask>> newAssignments; + + private AssignmentState(final ApplicationState applicationState, + final Map<ProcessId, KafkaStreamsState> clients, + final Map<TaskId, ProcessId> previousActiveAssignment, + final Map<TaskId, Set<ProcessId>> previousStandbyAssignment) { + this.clients = clients; + this.previousActiveAssignment = unmodifiableMap(previousActiveAssignment); + this.previousStandbyAssignment = unmodifiableMap(previousStandbyAssignment); + + final int taskCount = applicationState.allTasks().size(); + final int maxPairs = taskCount * (taskCount - 1) / 2; + this.taskPairs = new TaskPairs(maxPairs); + + this.newTaskLocations = new HashMap<>(); + this.newAssignments = new HashMap<>(); + } + + public void finalizeAssignment(final TaskId taskId, final ProcessId client, final AssignedTask.Type type) { + newAssignments.computeIfAbsent(client, k -> new HashSet<>()); + newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>()); + + final Set<TaskId> newAssignmentsForClient = newAssignments.get(client) + .stream().map(AssignedTask::id).collect(Collectors.toSet()); + + taskPairs.addPairs(taskId, newAssignmentsForClient); + newAssignments.get(client).add(new AssignedTask(taskId, type)); + newTaskLocations.get(taskId).add(client); + } + + public Map<ProcessId, KafkaStreamsAssignment> buildKafkaStreamsAssignments() { + final Map<ProcessId, KafkaStreamsAssignment> kafkaStreamsAssignments = new HashMap<>(); + for (final ProcessId processId : newAssignments.keySet()) { + final Set<AssignedTask> assignedTasks = newAssignments.get(processId); + final KafkaStreamsAssignment assignment = KafkaStreamsAssignment.of(processId, assignedTasks); + // TODO: Followup rebalance? + kafkaStreamsAssignments.put(processId, assignment); + } + return kafkaStreamsAssignments; + } + + public void processOptimizedAssignments(final Map<ProcessId, KafkaStreamsAssignment> optimizedAssignments) { + final Map<TaskId, Set<ProcessId>> newTaskLocations = new HashMap<>(); + final Map<ProcessId, Set<AssignedTask>> newAssignments = new HashMap<>(); + + for (final ProcessId processId : optimizedAssignments.keySet()) { + final Set<AssignedTask> assignedTasks = optimizedAssignments.get(processId).assignment(); + newAssignments.put(processId, assignedTasks); + + for (final AssignedTask task : assignedTasks) { + newTaskLocations.computeIfAbsent(task.id(), k -> new HashSet<>()); + newTaskLocations.get(task.id()).add(processId); + } + } + + this.newTaskLocations = newTaskLocations; + this.newAssignments = newAssignments; + } + + public boolean hasRoomForActiveTask(final ProcessId processId, final int activeTasksPerThread) { + final int capacity = clients.get(processId).numProcessingThreads(); + final int newActiveTaskCount = newAssignments.computeIfAbsent(processId, k -> new HashSet<>()) + .stream().filter(assignedTask -> assignedTask.type() == AssignedTask.Type.ACTIVE) + .collect(Collectors.toSet()) + .size(); + return newActiveTaskCount < capacity * activeTasksPerThread; + } + + public ProcessId findBestClientForTask(final TaskId taskId, final Set<ProcessId> clientsWithin) { + if (clientsWithin.size() == 1) { + return clientsWithin.iterator().next(); + } + + final ProcessId previousClient = findLeastLoadedClientWithPreviousActiveOrStandbyTask( + taskId, clientsWithin); + if (previousClient == null) { + return findLeastLoadedClient(taskId, clientsWithin); + } + + if (shouldBalanceLoad(previousClient)) { + final ProcessId standby = findLeastLoadedClientWithPreviousStandbyTask(taskId, clientsWithin); + if (standby == null || shouldBalanceLoad(standby)) { + return findLeastLoadedClient(taskId, clientsWithin); + } + return standby; + } + return previousClient; + } + + public Set<ProcessId> findClientsWithoutAssignedTask(final TaskId taskId) { + final Set<ProcessId> unavailableClients = newTaskLocations.computeIfAbsent(taskId, k -> new HashSet<>()); + return clients.values().stream() + .map(KafkaStreamsState::processId) + .filter(o -> !unavailableClients.contains(o)) + .collect(Collectors.toSet()); + } + + public double clientLoad(final ProcessId processId) { + final int capacity = clients.get(processId).numProcessingThreads(); + final double totalTaskCount = newAssignments.getOrDefault(processId, new HashSet<>()).size(); + return totalTaskCount / capacity; + } + + public ProcessId findLeastLoadedClient(final TaskId taskId, final Set<ProcessId> clientIds) { + ProcessId leastLoaded = null; + for (final ProcessId processId : clientIds) { + final double thisClientLoad = clientLoad(processId); + if (thisClientLoad == 0) { + return processId; + } + + if (leastLoaded == null || thisClientLoad < clientLoad(leastLoaded)) { + final Set<TaskId> assignedTasks = newAssignments.getOrDefault(processId, new HashSet<>()) + .stream().map(AssignedTask::id).collect(Collectors.toSet()); + if (taskPairs.hasNewPair(taskId, assignedTasks)) { + leastLoaded = processId; + } + } + } + + if (leastLoaded != null) { + return leastLoaded; + } + + for (final ProcessId processId : clientIds) { + final double thisClientLoad = clientLoad(processId); + + if (leastLoaded == null || thisClientLoad < clientLoad(leastLoaded)) { + leastLoaded = processId; + } + } + + return leastLoaded; + } + + public ProcessId findLeastLoadedClientWithPreviousActiveOrStandbyTask(final TaskId taskId, + final Set<ProcessId> clientsWithin) { Review Comment: nit: formatting/indentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org