cadonna commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r817629787
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask; +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks; + +/** + * Distributes standby tasks over different tag dimensions. Standby task distribution is on a best-effort basis. + * If rack aware standby task assignment is not possible, implementation fall backs to distributing standby tasks on least-loaded clients. + * + * @see DefaultStandbyTaskAssignor + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { + private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + + /** + * The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions. + * For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}. + * Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags}. + * Ideally, all standby tasks for any given stateful task will be located on different tag dimensions to have the best possible distribution. + * However, if the ideal (or partially ideal) distribution is impossible, the algorithm will fall back to the least-loaded clients without taking rack awareness constraints into consideration. + * The least-loaded clients are determined based on the total number of tasks (active and standby tasks) assigned to the client. + */ + @Override + public boolean assign(final Map<UUID, ClientState> clients, + final Set<TaskId> allTaskIds, + final Set<TaskId> statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { + final int numStandbyReplicas = configs.numStandbyReplicas; + final Set<String> rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); + + final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys( + numStandbyReplicas, + allTaskIds + ); + + final Map<String, Set<String>> tagKeyToValues = new HashMap<>(); + final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>(); + + fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues); + + final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = createLeastLoadedPrioritySetConstrainedByAssignedTask(clients); + + final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>(); + + for (final TaskId statefulTaskId : statefulTaskIds) { + for (final Map.Entry<UUID, ClientState> entry : clients.entrySet()) { + final UUID clientId = entry.getKey(); + final ClientState clientState = entry.getValue(); + + if (clientState.activeTasks().contains(statefulTaskId)) { + assignStandbyTasksToClientsWithDifferentTags( + standbyTaskClientsByTaskLoad, + statefulTaskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys, + tagKeyToValues, + tagEntryToClients, + pendingStandbyTasksToClientId + ); + } + } + } + + if (!tasksToRemainingStandbys.isEmpty()) { + log.debug("Rack aware standby task assignment was not able to assign all standby tasks. " + Review comment: I think info log would also be OK here. I imagine users that are wondering why their standbys are not distributed as they would expect. With this information they can at least try to fix it on the config level. This log message should only happen at rebalance time, which should usually be rather seldom. If we decide to put the log message on info level, you should also change a bit the wording and not use variable names in it. Maybe some hints what the users can do to fix this would also be nice. Is it possible to separate the concerns of this log message and the one on line 135? Something along the lines of here the rack-aware standby assignment did not work due the tag config and on line 135 the assignment did not work due to too low number of instances. We can then put both on warn or info (do not forget to also check the related log message in the default standby assignor). ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ########## @@ -115,55 +116,46 @@ private static void assignActiveStatefulTasks(final SortedMap<UUID, ClientState> clientStates, ClientState::activeTasks, ClientState::unassignActive, - ClientState::assignActive + ClientState::assignActive, + (source, destination) -> true ); } - private static void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientStates, - final Set<TaskId> statefulTasks, - final int numStandbyReplicas) { - final Map<TaskId, Integer> tasksToRemainingStandbys = - statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> numStandbyReplicas)); + private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> clientStates, + final Set<TaskId> allTaskIds, + final Set<TaskId> statefulTasks, + final AssignmentConfigs configs) { + if (configs.numStandbyReplicas == 0) { + return; + } - final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = new ConstrainedPrioritySet( - (client, task) -> !clientStates.get(client).hasAssignedTask(task), - client -> clientStates.get(client).assignedTaskLoad() - ); - standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet()); + final StandbyTaskAssignor standbyTaskAssignor = createStandbyTaskAssignor(configs); - for (final TaskId task : statefulTasks) { - int numRemainingStandbys = tasksToRemainingStandbys.get(task); - while (numRemainingStandbys > 0) { - final UUID client = standbyTaskClientsByTaskLoad.poll(task); - if (client == null) { - break; - } - clientStates.get(client).assignStandby(task); - numRemainingStandbys--; - standbyTaskClientsByTaskLoad.offer(client); - } - - if (numRemainingStandbys > 0) { - log.warn("Unable to assign {} of {} standby tasks for task [{}]. " + - "There is not enough available capacity. You should " + - "increase the number of application instances " + - "to maintain the requested number of standby replicas.", - numRemainingStandbys, numStandbyReplicas, task); - } - } + standbyTaskAssignor.assign(clientStates, allTaskIds, statefulTasks, configs); balanceTasksOverThreads( clientStates, ClientState::standbyTasks, ClientState::unassignStandby, - ClientState::assignStandby + ClientState::assignStandby, + standbyTaskAssignor::isAllowedTaskMovement ); } + // Visible for testing + static StandbyTaskAssignor createStandbyTaskAssignor(final AssignmentConfigs configs) { + if (!configs.rackAwareAssignmentTags.isEmpty()) { + return new ClientTagAwareStandbyTaskAssignor(); + } else { + return new DefaultStandbyTaskAssignor(); + } + } + Review comment: This can be done in a follow-up PR: I am not a big fan of `// Visible for testing` because it often means that we missed to extract code to separate classes. Here I would definitely extract this code to a factory class. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask; +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks; + +/** + * Distributes standby tasks over different tag dimensions. Standby task distribution is on a best-effort basis. + * If rack aware standby task assignment is not possible, implementation fall backs to distributing standby tasks on least-loaded clients. + * + * @see DefaultStandbyTaskAssignor + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { + private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + + /** + * The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions. + * For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}. + * Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags}. + * Ideally, all standby tasks for any given stateful task will be located on different tag dimensions to have the best possible distribution. + * However, if the ideal (or partially ideal) distribution is impossible, the algorithm will fall back to the least-loaded clients without taking rack awareness constraints into consideration. + * The least-loaded clients are determined based on the total number of tasks (active and standby tasks) assigned to the client. + */ + @Override + public boolean assign(final Map<UUID, ClientState> clients, + final Set<TaskId> allTaskIds, + final Set<TaskId> statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { + final int numStandbyReplicas = configs.numStandbyReplicas; + final Set<String> rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); + + final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys( + numStandbyReplicas, + allTaskIds + ); + + final Map<String, Set<String>> tagKeyToValues = new HashMap<>(); + final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>(); + + fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues); + + final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = createLeastLoadedPrioritySetConstrainedByAssignedTask(clients); + + final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>(); + + for (final TaskId statefulTaskId : statefulTaskIds) { + for (final Map.Entry<UUID, ClientState> entry : clients.entrySet()) { + final UUID clientId = entry.getKey(); + final ClientState clientState = entry.getValue(); + + if (clientState.activeTasks().contains(statefulTaskId)) { + assignStandbyTasksToClientsWithDifferentTags( + standbyTaskClientsByTaskLoad, + statefulTaskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys, + tagKeyToValues, + tagEntryToClients, + pendingStandbyTasksToClientId + ); + } + } + } + + if (!tasksToRemainingStandbys.isEmpty()) { + log.debug("Rack aware standby task assignment was not able to assign all standby tasks. " + + "tasksToRemainingStandbys=[{}], pendingStandbyTasksToClientId=[{}]. " + + "Will distribute the remaining standby tasks to least loaded clients.", + tasksToRemainingStandbys, pendingStandbyTasksToClientId); + + assignPendingStandbyTasksToLeastLoadedClients(clients, + numStandbyReplicas, + rackAwareAssignmentTags, + standbyTaskClientsByTaskLoad, + tasksToRemainingStandbys, + pendingStandbyTasksToClientId); + } + + // returning false, because standby task assignment will never require a follow-up probing rebalance. + return false; + } + + private static void assignPendingStandbyTasksToLeastLoadedClients(final Map<UUID, ClientState> clients, + final int numStandbyReplicas, + final Set<String> rackAwareAssignmentTags, + final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, + final Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys, + final Map<TaskId, UUID> pendingStandbyTaskToClientId) { + // We need to re offer all the clients to find the least loaded ones + standbyTaskClientsByTaskLoad.offerAll(clients.keySet()); + + for (final Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) { + final TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey(); + final UUID clientId = pendingStandbyTaskToClientId.get(activeTaskId); + + final int numberOfRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks( + clients, + pendingStandbyTaskToNumberRemainingStandbys, + standbyTaskClientsByTaskLoad, + activeTaskId + ); + + if (numberOfRemainingStandbys > 0) { + log.warn("Unable to assign {} of {} standby tasks for task [{}] with client tags [{}]. " + + "There is not enough available capacity. You should " + + "increase the number of application instances " + + "on different client tag dimensions " + + "to maintain the requested number of standby replicas. " + + "Rack awareness is configured with [{}] tags.", + numberOfRemainingStandbys, numStandbyReplicas, activeTaskId, + clients.get(clientId).clientTags(), rackAwareAssignmentTags); + } + } + } + + @Override + public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { + final Map<String, String> sourceClientTags = source.clientTags(); + final Map<String, String> destinationClientTags = destination.clientTags(); + + for (final Entry<String, String> sourceClientTagEntry : sourceClientTags.entrySet()) { + if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey()))) { + return false; + } + } + + return true; + } + + // Visible for testing + static void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates, + final Map<TagEntry, Set<UUID>> tagEntryToClients, + final Map<String, Set<String>> tagKeyToValues) { + for (final Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) { + final UUID clientId = clientStateEntry.getKey(); + final ClientState clientState = clientStateEntry.getValue(); + + clientState.clientTags().forEach((tagKey, tagValue) -> { + tagKeyToValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue); + tagEntryToClients.computeIfAbsent(new TagEntry(tagKey, tagValue), ignored -> new HashSet<>()).add(clientId); + }); + } + } + + // Visible for testing + static void assignStandbyTasksToClientsWithDifferentTags(final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, + final TaskId activeTaskId, + final UUID activeTaskClient, + final Set<String> rackAwareAssignmentTags, + final Map<UUID, ClientState> clientStates, + final Map<TaskId, Integer> tasksToRemainingStandbys, + final Map<String, Set<String>> tagKeyToValues, + final Map<TagEntry, Set<UUID>> tagEntryToClients, + final Map<TaskId, UUID> pendingStandbyTasksToClientId) { + standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet()); + + // We set countOfUsedClients as 1 because client where active task is located has to be considered as used. + int countOfUsedClients = 1; + int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId); + + final Map<TagEntry, Set<UUID>> tagEntryToUsedClients = new HashMap<>(); + + UUID lastUsedClient = activeTaskClient; + do { + updateClientsOnAlreadyUsedTagEntries( + lastUsedClient, + countOfUsedClients, + rackAwareAssignmentTags, + clientStates, + tagEntryToClients, + tagKeyToValues, + tagEntryToUsedClients + ); + + final UUID clientOnUnusedTagDimensions = standbyTaskClientsByTaskLoad.poll( + activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid, tagEntryToUsedClients) + ); + + if (clientOnUnusedTagDimensions == null) { + break; + } + + clientStates.get(clientOnUnusedTagDimensions).assignStandby(activeTaskId); + + countOfUsedClients++; + numRemainingStandbys--; + + lastUsedClient = clientOnUnusedTagDimensions; + } while (numRemainingStandbys > 0); + + if (numRemainingStandbys > 0) { + pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient); + tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys); + } else { + tasksToRemainingStandbys.remove(activeTaskId); + } + } + + private static boolean isClientUsedOnAnyOfTheTagEntries(final UUID client, + final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) { + return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client)); + } + + private static void updateClientsOnAlreadyUsedTagEntries(final UUID usedClient, + final int countOfUsedClients, + final Set<String> rackAwareAssignmentTags, + final Map<UUID, ClientState> clientStates, + final Map<TagEntry, Set<UUID>> tagEntryToClients, + final Map<String, Set<String>> tagKeyToValues, + final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) { + final Map<String, String> usedClientTags = clientStates.get(usedClient).clientTags(); + + for (final Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) { + final String tagKey = usedClientTagEntry.getKey(); + + if (!rackAwareAssignmentTags.contains(tagKey)) { + log.warn("Client tag with key [{}] will be ignored when computing rack aware standby " + + "task assignment because it is not part of the configured rack awareness [{}].", + tagKey, rackAwareAssignmentTags); + continue; + } + + final Set<String> allTagValues = tagKeyToValues.get(tagKey); + + // Consider the following client setup where we need to distribute 2 standby tasks for each stateful task. + // + // # Kafka Streams Client 1 + // client.tag.zone: eu-central-1a + // client.tag.cluster: k8s-cluster1 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 2 + // client.tag.zone: eu-central-1b + // client.tag.cluster: k8s-cluster1 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 3 + // client.tag.zone: eu-central-1c + // client.tag.cluster: k8s-cluster1 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 4 + // client.tag.zone: eu-central-1a + // client.tag.cluster: k8s-cluster2 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 5 + // client.tag.zone: eu-central-1b + // client.tag.cluster: k8s-cluster2 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 6 + // client.tag.zone: eu-central-1c + // client.tag.cluster: k8s-cluster2 + // rack.aware.assignment.tags: zone,cluster + // + // Since we have only two unique `cluster` tag values, + // we can only achieve "ideal" distribution on the 1st standby task assignment. + // Ideal distribution for the 1st standby task can be achieved because we can assign standby task + // to the client located on different cluster and zone compared to an active task. + // We can't consider the `cluster` tag for the 2nd standby task assignment because the 1st standby + // task would already be assigned on different cluster compared to the active one, which means + // we have already used all the available cluster tag values. Taking the `cluster` tag into consideration + // for the 2nd standby task assignment would affectively mean excluding all the clients. Review comment: ```suggestion // for the 2nd standby task assignment would effectively mean excluding all the clients. ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask; +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks; + +/** + * Distributes standby tasks over different tag dimensions. Standby task distribution is on a best-effort basis. + * If rack aware standby task assignment is not possible, implementation fall backs to distributing standby tasks on least-loaded clients. + * + * @see DefaultStandbyTaskAssignor + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { + private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + + /** + * The algorithm distributes standby tasks for the {@param statefulTaskIds} over different tag dimensions. + * For each stateful task, the number of standby tasks will be assigned based on configured {@link AssignmentConfigs#numStandbyReplicas}. + * Rack aware standby tasks distribution only takes into account tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags}. + * Ideally, all standby tasks for any given stateful task will be located on different tag dimensions to have the best possible distribution. + * However, if the ideal (or partially ideal) distribution is impossible, the algorithm will fall back to the least-loaded clients without taking rack awareness constraints into consideration. + * The least-loaded clients are determined based on the total number of tasks (active and standby tasks) assigned to the client. + */ + @Override + public boolean assign(final Map<UUID, ClientState> clients, + final Set<TaskId> allTaskIds, + final Set<TaskId> statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { + final int numStandbyReplicas = configs.numStandbyReplicas; + final Set<String> rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); + + final Map<TaskId, Integer> tasksToRemainingStandbys = computeTasksToRemainingStandbys( + numStandbyReplicas, + allTaskIds + ); + + final Map<String, Set<String>> tagKeyToValues = new HashMap<>(); + final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>(); + + fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues); + + final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = createLeastLoadedPrioritySetConstrainedByAssignedTask(clients); + + final Map<TaskId, UUID> pendingStandbyTasksToClientId = new HashMap<>(); + + for (final TaskId statefulTaskId : statefulTaskIds) { + for (final Map.Entry<UUID, ClientState> entry : clients.entrySet()) { + final UUID clientId = entry.getKey(); + final ClientState clientState = entry.getValue(); + + if (clientState.activeTasks().contains(statefulTaskId)) { + assignStandbyTasksToClientsWithDifferentTags( + standbyTaskClientsByTaskLoad, + statefulTaskId, + clientId, + rackAwareAssignmentTags, + clients, + tasksToRemainingStandbys, + tagKeyToValues, + tagEntryToClients, + pendingStandbyTasksToClientId + ); + } + } + } + + if (!tasksToRemainingStandbys.isEmpty()) { + log.debug("Rack aware standby task assignment was not able to assign all standby tasks. " + + "tasksToRemainingStandbys=[{}], pendingStandbyTasksToClientId=[{}]. " + + "Will distribute the remaining standby tasks to least loaded clients.", + tasksToRemainingStandbys, pendingStandbyTasksToClientId); + + assignPendingStandbyTasksToLeastLoadedClients(clients, + numStandbyReplicas, + rackAwareAssignmentTags, + standbyTaskClientsByTaskLoad, + tasksToRemainingStandbys, + pendingStandbyTasksToClientId); + } + + // returning false, because standby task assignment will never require a follow-up probing rebalance. + return false; + } + + private static void assignPendingStandbyTasksToLeastLoadedClients(final Map<UUID, ClientState> clients, + final int numStandbyReplicas, + final Set<String> rackAwareAssignmentTags, + final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, + final Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys, + final Map<TaskId, UUID> pendingStandbyTaskToClientId) { + // We need to re offer all the clients to find the least loaded ones + standbyTaskClientsByTaskLoad.offerAll(clients.keySet()); + + for (final Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : pendingStandbyTaskToNumberRemainingStandbys.entrySet()) { + final TaskId activeTaskId = pendingStandbyTaskAssignmentEntry.getKey(); + final UUID clientId = pendingStandbyTaskToClientId.get(activeTaskId); + + final int numberOfRemainingStandbys = pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks( + clients, + pendingStandbyTaskToNumberRemainingStandbys, + standbyTaskClientsByTaskLoad, + activeTaskId + ); + + if (numberOfRemainingStandbys > 0) { + log.warn("Unable to assign {} of {} standby tasks for task [{}] with client tags [{}]. " + + "There is not enough available capacity. You should " + + "increase the number of application instances " + + "on different client tag dimensions " + + "to maintain the requested number of standby replicas. " + + "Rack awareness is configured with [{}] tags.", + numberOfRemainingStandbys, numStandbyReplicas, activeTaskId, + clients.get(clientId).clientTags(), rackAwareAssignmentTags); + } + } + } + + @Override + public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { + final Map<String, String> sourceClientTags = source.clientTags(); + final Map<String, String> destinationClientTags = destination.clientTags(); + + for (final Entry<String, String> sourceClientTagEntry : sourceClientTags.entrySet()) { + if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey()))) { + return false; + } + } + + return true; + } + + // Visible for testing + static void fillClientsTagStatistics(final Map<UUID, ClientState> clientStates, + final Map<TagEntry, Set<UUID>> tagEntryToClients, + final Map<String, Set<String>> tagKeyToValues) { + for (final Entry<UUID, ClientState> clientStateEntry : clientStates.entrySet()) { + final UUID clientId = clientStateEntry.getKey(); + final ClientState clientState = clientStateEntry.getValue(); + + clientState.clientTags().forEach((tagKey, tagValue) -> { + tagKeyToValues.computeIfAbsent(tagKey, ignored -> new HashSet<>()).add(tagValue); + tagEntryToClients.computeIfAbsent(new TagEntry(tagKey, tagValue), ignored -> new HashSet<>()).add(clientId); + }); + } + } + + // Visible for testing + static void assignStandbyTasksToClientsWithDifferentTags(final ConstrainedPrioritySet standbyTaskClientsByTaskLoad, + final TaskId activeTaskId, + final UUID activeTaskClient, + final Set<String> rackAwareAssignmentTags, + final Map<UUID, ClientState> clientStates, + final Map<TaskId, Integer> tasksToRemainingStandbys, + final Map<String, Set<String>> tagKeyToValues, + final Map<TagEntry, Set<UUID>> tagEntryToClients, + final Map<TaskId, UUID> pendingStandbyTasksToClientId) { + standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet()); + + // We set countOfUsedClients as 1 because client where active task is located has to be considered as used. + int countOfUsedClients = 1; + int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId); + + final Map<TagEntry, Set<UUID>> tagEntryToUsedClients = new HashMap<>(); + + UUID lastUsedClient = activeTaskClient; + do { + updateClientsOnAlreadyUsedTagEntries( + lastUsedClient, + countOfUsedClients, + rackAwareAssignmentTags, + clientStates, + tagEntryToClients, + tagKeyToValues, + tagEntryToUsedClients + ); + + final UUID clientOnUnusedTagDimensions = standbyTaskClientsByTaskLoad.poll( + activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid, tagEntryToUsedClients) + ); + + if (clientOnUnusedTagDimensions == null) { + break; + } + + clientStates.get(clientOnUnusedTagDimensions).assignStandby(activeTaskId); + + countOfUsedClients++; + numRemainingStandbys--; + + lastUsedClient = clientOnUnusedTagDimensions; + } while (numRemainingStandbys > 0); + + if (numRemainingStandbys > 0) { + pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient); + tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys); + } else { + tasksToRemainingStandbys.remove(activeTaskId); + } + } + + private static boolean isClientUsedOnAnyOfTheTagEntries(final UUID client, + final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) { + return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> usedClients.contains(client)); + } + + private static void updateClientsOnAlreadyUsedTagEntries(final UUID usedClient, + final int countOfUsedClients, + final Set<String> rackAwareAssignmentTags, + final Map<UUID, ClientState> clientStates, + final Map<TagEntry, Set<UUID>> tagEntryToClients, + final Map<String, Set<String>> tagKeyToValues, + final Map<TagEntry, Set<UUID>> tagEntryToUsedClients) { + final Map<String, String> usedClientTags = clientStates.get(usedClient).clientTags(); + + for (final Entry<String, String> usedClientTagEntry : usedClientTags.entrySet()) { + final String tagKey = usedClientTagEntry.getKey(); + + if (!rackAwareAssignmentTags.contains(tagKey)) { + log.warn("Client tag with key [{}] will be ignored when computing rack aware standby " + + "task assignment because it is not part of the configured rack awareness [{}].", + tagKey, rackAwareAssignmentTags); + continue; + } + + final Set<String> allTagValues = tagKeyToValues.get(tagKey); + + // Consider the following client setup where we need to distribute 2 standby tasks for each stateful task. + // + // # Kafka Streams Client 1 + // client.tag.zone: eu-central-1a + // client.tag.cluster: k8s-cluster1 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 2 + // client.tag.zone: eu-central-1b + // client.tag.cluster: k8s-cluster1 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 3 + // client.tag.zone: eu-central-1c + // client.tag.cluster: k8s-cluster1 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 4 + // client.tag.zone: eu-central-1a + // client.tag.cluster: k8s-cluster2 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 5 + // client.tag.zone: eu-central-1b + // client.tag.cluster: k8s-cluster2 + // rack.aware.assignment.tags: zone,cluster + // + // # Kafka Streams Client 6 + // client.tag.zone: eu-central-1c + // client.tag.cluster: k8s-cluster2 + // rack.aware.assignment.tags: zone,cluster + // + // Since we have only two unique `cluster` tag values, + // we can only achieve "ideal" distribution on the 1st standby task assignment. + // Ideal distribution for the 1st standby task can be achieved because we can assign standby task + // to the client located on different cluster and zone compared to an active task. + // We can't consider the `cluster` tag for the 2nd standby task assignment because the 1st standby + // task would already be assigned on different cluster compared to the active one, which means + // we have already used all the available cluster tag values. Taking the `cluster` tag into consideration + // for the 2nd standby task assignment would affectively mean excluding all the clients. + // Instead, for the 2nd standby task, we can only achieve partial rack awareness based on the `zone` tag. + // As we don't consider the `cluster` tag for the 2nd standby task assignment, partial rack awareness + // can be satisfied by placing the 2nd standby client on a different `zone` tag compared to active and corresponding standby tasks. + // The `zone` on either `cluster` tags are valid candidates for the partial rack awareness, as our goal is to distribute clients on the different `zone` tags. + + // This statement checks if we have used more clients than the all the unique values for the given tag, Review comment: ```suggestion // This statement checks if we have used more clients than the number of unique values for the given tag, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
