cadonna commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r817629787



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask;
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks;
+
+/**
+ * Distributes standby tasks over different tag dimensions. Standby task 
distribution is on a best-effort basis.
+ * If rack aware standby task assignment is not possible, implementation fall 
backs to distributing standby tasks on least-loaded clients.
+ *
+ * @see DefaultStandbyTaskAssignor
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+    /**
+     * The algorithm distributes standby tasks for the {@param 
statefulTaskIds} over different tag dimensions.
+     * For each stateful task, the number of standby tasks will be assigned 
based on configured {@link AssignmentConfigs#numStandbyReplicas}.
+     * Rack aware standby tasks distribution only takes into account tags 
specified via {@link AssignmentConfigs#rackAwareAssignmentTags}.
+     * Ideally, all standby tasks for any given stateful task will be located 
on different tag dimensions to have the best possible distribution.
+     * However, if the ideal (or partially ideal) distribution is impossible, 
the algorithm will fall back to the least-loaded clients without taking rack 
awareness constraints into consideration.
+     * The least-loaded clients are determined based on the total number of 
tasks (active and standby tasks) assigned to the client.
+     */
+    @Override
+    public boolean assign(final Map<UUID, ClientState> clients,
+                          final Set<TaskId> allTaskIds,
+                          final Set<TaskId> statefulTaskIds,
+                          final AssignorConfiguration.AssignmentConfigs 
configs) {
+        final int numStandbyReplicas = configs.numStandbyReplicas;
+        final Set<String> rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+
+        final Map<TaskId, Integer> tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+            numStandbyReplicas,
+            allTaskIds
+        );
+
+        final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
+        final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>();
+
+        fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues);
+
+        final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
createLeastLoadedPrioritySetConstrainedByAssignedTask(clients);
+
+        final Map<TaskId, UUID> pendingStandbyTasksToClientId = new 
HashMap<>();
+
+        for (final TaskId statefulTaskId : statefulTaskIds) {
+            for (final Map.Entry<UUID, ClientState> entry : 
clients.entrySet()) {
+                final UUID clientId = entry.getKey();
+                final ClientState clientState = entry.getValue();
+
+                if (clientState.activeTasks().contains(statefulTaskId)) {
+                    assignStandbyTasksToClientsWithDifferentTags(
+                        standbyTaskClientsByTaskLoad,
+                        statefulTaskId,
+                        clientId,
+                        rackAwareAssignmentTags,
+                        clients,
+                        tasksToRemainingStandbys,
+                        tagKeyToValues,
+                        tagEntryToClients,
+                        pendingStandbyTasksToClientId
+                    );
+                }
+            }
+        }
+
+        if (!tasksToRemainingStandbys.isEmpty()) {
+            log.debug("Rack aware standby task assignment was not able to 
assign all standby tasks. " +

Review comment:
       I think info log would also be OK here. I imagine users that are 
wondering why their standbys are not distributed as they would expect. With 
this information they can at least try to fix it on the config level. This log 
message should only happen at rebalance time, which should usually be rather 
seldom.
   If we decide to put the log message on info level, you should also change a 
bit the wording and not use variable names in it. Maybe some hints what the 
users can do to fix this would also be nice.
   
   Is it possible to separate the concerns of this log message and the one on 
line 135? Something along the lines of here the rack-aware standby assignment 
did not work due the tag config and on line 135 the assignment did not work due 
to too low number of instances. We can then put both on warn or info (do not 
forget to also check the related log message in the default standby assignor).  
 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -115,55 +116,46 @@ private static void assignActiveStatefulTasks(final 
SortedMap<UUID, ClientState>
             clientStates,
             ClientState::activeTasks,
             ClientState::unassignActive,
-            ClientState::assignActive
+            ClientState::assignActive,
+            (source, destination) -> true
         );
     }
 
-    private static void assignStandbyReplicaTasks(final TreeMap<UUID, 
ClientState> clientStates,
-                                                  final Set<TaskId> 
statefulTasks,
-                                                  final int 
numStandbyReplicas) {
-        final Map<TaskId, Integer> tasksToRemainingStandbys =
-            statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
numStandbyReplicas));
+    private void assignStandbyReplicaTasks(final TreeMap<UUID, ClientState> 
clientStates,
+                                           final Set<TaskId> allTaskIds,
+                                           final Set<TaskId> statefulTasks,
+                                           final AssignmentConfigs configs) {
+        if (configs.numStandbyReplicas == 0) {
+            return;
+        }
 
-        final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = new 
ConstrainedPrioritySet(
-            (client, task) -> !clientStates.get(client).hasAssignedTask(task),
-            client -> clientStates.get(client).assignedTaskLoad()
-        );
-        standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
+        final StandbyTaskAssignor standbyTaskAssignor = 
createStandbyTaskAssignor(configs);
 
-        for (final TaskId task : statefulTasks) {
-            int numRemainingStandbys = tasksToRemainingStandbys.get(task);
-            while (numRemainingStandbys > 0) {
-                final UUID client = standbyTaskClientsByTaskLoad.poll(task);
-                if (client == null) {
-                    break;
-                }
-                clientStates.get(client).assignStandby(task);
-                numRemainingStandbys--;
-                standbyTaskClientsByTaskLoad.offer(client);
-            }
-
-            if (numRemainingStandbys > 0) {
-                log.warn("Unable to assign {} of {} standby tasks for task 
[{}]. " +
-                             "There is not enough available capacity. You 
should " +
-                             "increase the number of application instances " +
-                             "to maintain the requested number of standby 
replicas.",
-                         numRemainingStandbys, numStandbyReplicas, task);
-            }
-        }
+        standbyTaskAssignor.assign(clientStates, allTaskIds, statefulTasks, 
configs);
 
         balanceTasksOverThreads(
             clientStates,
             ClientState::standbyTasks,
             ClientState::unassignStandby,
-            ClientState::assignStandby
+            ClientState::assignStandby,
+            standbyTaskAssignor::isAllowedTaskMovement
         );
     }
 
+    // Visible for testing
+    static StandbyTaskAssignor createStandbyTaskAssignor(final 
AssignmentConfigs configs) {
+        if (!configs.rackAwareAssignmentTags.isEmpty()) {
+            return new ClientTagAwareStandbyTaskAssignor();
+        } else {
+            return new DefaultStandbyTaskAssignor();
+        }
+    }
+

Review comment:
       This can be done in a follow-up PR:
   I am not a big fan of `// Visible for testing` because it often means that 
we missed to extract code to separate classes. Here I would definitely extract 
this code to a factory class. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask;
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks;
+
+/**
+ * Distributes standby tasks over different tag dimensions. Standby task 
distribution is on a best-effort basis.
+ * If rack aware standby task assignment is not possible, implementation fall 
backs to distributing standby tasks on least-loaded clients.
+ *
+ * @see DefaultStandbyTaskAssignor
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+    /**
+     * The algorithm distributes standby tasks for the {@param 
statefulTaskIds} over different tag dimensions.
+     * For each stateful task, the number of standby tasks will be assigned 
based on configured {@link AssignmentConfigs#numStandbyReplicas}.
+     * Rack aware standby tasks distribution only takes into account tags 
specified via {@link AssignmentConfigs#rackAwareAssignmentTags}.
+     * Ideally, all standby tasks for any given stateful task will be located 
on different tag dimensions to have the best possible distribution.
+     * However, if the ideal (or partially ideal) distribution is impossible, 
the algorithm will fall back to the least-loaded clients without taking rack 
awareness constraints into consideration.
+     * The least-loaded clients are determined based on the total number of 
tasks (active and standby tasks) assigned to the client.
+     */
+    @Override
+    public boolean assign(final Map<UUID, ClientState> clients,
+                          final Set<TaskId> allTaskIds,
+                          final Set<TaskId> statefulTaskIds,
+                          final AssignorConfiguration.AssignmentConfigs 
configs) {
+        final int numStandbyReplicas = configs.numStandbyReplicas;
+        final Set<String> rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+
+        final Map<TaskId, Integer> tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+            numStandbyReplicas,
+            allTaskIds
+        );
+
+        final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
+        final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>();
+
+        fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues);
+
+        final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
createLeastLoadedPrioritySetConstrainedByAssignedTask(clients);
+
+        final Map<TaskId, UUID> pendingStandbyTasksToClientId = new 
HashMap<>();
+
+        for (final TaskId statefulTaskId : statefulTaskIds) {
+            for (final Map.Entry<UUID, ClientState> entry : 
clients.entrySet()) {
+                final UUID clientId = entry.getKey();
+                final ClientState clientState = entry.getValue();
+
+                if (clientState.activeTasks().contains(statefulTaskId)) {
+                    assignStandbyTasksToClientsWithDifferentTags(
+                        standbyTaskClientsByTaskLoad,
+                        statefulTaskId,
+                        clientId,
+                        rackAwareAssignmentTags,
+                        clients,
+                        tasksToRemainingStandbys,
+                        tagKeyToValues,
+                        tagEntryToClients,
+                        pendingStandbyTasksToClientId
+                    );
+                }
+            }
+        }
+
+        if (!tasksToRemainingStandbys.isEmpty()) {
+            log.debug("Rack aware standby task assignment was not able to 
assign all standby tasks. " +
+                      "tasksToRemainingStandbys=[{}], 
pendingStandbyTasksToClientId=[{}]. " +
+                      "Will distribute the remaining standby tasks to least 
loaded clients.",
+                      tasksToRemainingStandbys, pendingStandbyTasksToClientId);
+
+            assignPendingStandbyTasksToLeastLoadedClients(clients,
+                                                          numStandbyReplicas,
+                                                          
rackAwareAssignmentTags,
+                                                          
standbyTaskClientsByTaskLoad,
+                                                          
tasksToRemainingStandbys,
+                                                          
pendingStandbyTasksToClientId);
+        }
+
+        // returning false, because standby task assignment will never require 
a follow-up probing rebalance.
+        return false;
+    }
+
+    private static void assignPendingStandbyTasksToLeastLoadedClients(final 
Map<UUID, ClientState> clients,
+                                                                      final 
int numStandbyReplicas,
+                                                                      final 
Set<String> rackAwareAssignmentTags,
+                                                                      final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+                                                                      final 
Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys,
+                                                                      final 
Map<TaskId, UUID> pendingStandbyTaskToClientId) {
+        // We need to re offer all the clients to find the least loaded ones
+        standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
+
+        for (final Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : 
pendingStandbyTaskToNumberRemainingStandbys.entrySet()) {
+            final TaskId activeTaskId = 
pendingStandbyTaskAssignmentEntry.getKey();
+            final UUID clientId = 
pendingStandbyTaskToClientId.get(activeTaskId);
+
+            final int numberOfRemainingStandbys = 
pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
+                clients,
+                pendingStandbyTaskToNumberRemainingStandbys,
+                standbyTaskClientsByTaskLoad,
+                activeTaskId
+            );
+
+            if (numberOfRemainingStandbys > 0) {
+                log.warn("Unable to assign {} of {} standby tasks for task 
[{}] with client tags [{}]. " +
+                         "There is not enough available capacity. You should " 
+
+                         "increase the number of application instances " +
+                         "on different client tag dimensions " +
+                         "to maintain the requested number of standby 
replicas. " +
+                         "Rack awareness is configured with [{}] tags.",
+                         numberOfRemainingStandbys, numStandbyReplicas, 
activeTaskId,
+                         clients.get(clientId).clientTags(), 
rackAwareAssignmentTags);
+            }
+        }
+    }
+
+    @Override
+    public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+        final Map<String, String> sourceClientTags = source.clientTags();
+        final Map<String, String> destinationClientTags = 
destination.clientTags();
+
+        for (final Entry<String, String> sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+            if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey())))
 {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    // Visible for testing
+    static void fillClientsTagStatistics(final Map<UUID, ClientState> 
clientStates,
+                                         final Map<TagEntry, Set<UUID>> 
tagEntryToClients,
+                                         final Map<String, Set<String>> 
tagKeyToValues) {
+        for (final Entry<UUID, ClientState> clientStateEntry : 
clientStates.entrySet()) {
+            final UUID clientId = clientStateEntry.getKey();
+            final ClientState clientState = clientStateEntry.getValue();
+
+            clientState.clientTags().forEach((tagKey, tagValue) -> {
+                tagKeyToValues.computeIfAbsent(tagKey, ignored -> new 
HashSet<>()).add(tagValue);
+                tagEntryToClients.computeIfAbsent(new TagEntry(tagKey, 
tagValue), ignored -> new HashSet<>()).add(clientId);
+            });
+        }
+    }
+
+    // Visible for testing
+    static void assignStandbyTasksToClientsWithDifferentTags(final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+                                                             final TaskId 
activeTaskId,
+                                                             final UUID 
activeTaskClient,
+                                                             final Set<String> 
rackAwareAssignmentTags,
+                                                             final Map<UUID, 
ClientState> clientStates,
+                                                             final Map<TaskId, 
Integer> tasksToRemainingStandbys,
+                                                             final Map<String, 
Set<String>> tagKeyToValues,
+                                                             final 
Map<TagEntry, Set<UUID>> tagEntryToClients,
+                                                             final Map<TaskId, 
UUID> pendingStandbyTasksToClientId) {
+        standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
+
+        // We set countOfUsedClients as 1 because client where active task is 
located has to be considered as used.
+        int countOfUsedClients = 1;
+        int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
+
+        final Map<TagEntry, Set<UUID>> tagEntryToUsedClients = new HashMap<>();
+
+        UUID lastUsedClient = activeTaskClient;
+        do {
+            updateClientsOnAlreadyUsedTagEntries(
+                lastUsedClient,
+                countOfUsedClients,
+                rackAwareAssignmentTags,
+                clientStates,
+                tagEntryToClients,
+                tagKeyToValues,
+                tagEntryToUsedClients
+            );
+
+            final UUID clientOnUnusedTagDimensions = 
standbyTaskClientsByTaskLoad.poll(
+                activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid, 
tagEntryToUsedClients)
+            );
+
+            if (clientOnUnusedTagDimensions == null) {
+                break;
+            }
+
+            
clientStates.get(clientOnUnusedTagDimensions).assignStandby(activeTaskId);
+
+            countOfUsedClients++;
+            numRemainingStandbys--;
+
+            lastUsedClient = clientOnUnusedTagDimensions;
+        } while (numRemainingStandbys > 0);
+
+        if (numRemainingStandbys > 0) {
+            pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
+            tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
+        } else {
+            tasksToRemainingStandbys.remove(activeTaskId);
+        }
+    }
+
+    private static boolean isClientUsedOnAnyOfTheTagEntries(final UUID client,
+                                                            final 
Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
+        return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> 
usedClients.contains(client));
+    }
+
+    private static void updateClientsOnAlreadyUsedTagEntries(final UUID 
usedClient,
+                                                             final int 
countOfUsedClients,
+                                                             final Set<String> 
rackAwareAssignmentTags,
+                                                             final Map<UUID, 
ClientState> clientStates,
+                                                             final 
Map<TagEntry, Set<UUID>> tagEntryToClients,
+                                                             final Map<String, 
Set<String>> tagKeyToValues,
+                                                             final 
Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
+        final Map<String, String> usedClientTags = 
clientStates.get(usedClient).clientTags();
+
+        for (final Entry<String, String> usedClientTagEntry : 
usedClientTags.entrySet()) {
+            final String tagKey = usedClientTagEntry.getKey();
+
+            if (!rackAwareAssignmentTags.contains(tagKey)) {
+                log.warn("Client tag with key [{}] will be ignored when 
computing rack aware standby " +
+                         "task assignment because it is not part of the 
configured rack awareness [{}].",
+                         tagKey, rackAwareAssignmentTags);
+                continue;
+            }
+
+            final Set<String> allTagValues = tagKeyToValues.get(tagKey);
+
+            // Consider the following client setup where we need to distribute 
2 standby tasks for each stateful task.
+            //
+            // # Kafka Streams Client 1
+            // client.tag.zone: eu-central-1a
+            // client.tag.cluster: k8s-cluster1
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 2
+            // client.tag.zone: eu-central-1b
+            // client.tag.cluster: k8s-cluster1
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 3
+            // client.tag.zone: eu-central-1c
+            // client.tag.cluster: k8s-cluster1
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 4
+            // client.tag.zone: eu-central-1a
+            // client.tag.cluster: k8s-cluster2
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 5
+            // client.tag.zone: eu-central-1b
+            // client.tag.cluster: k8s-cluster2
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 6
+            // client.tag.zone: eu-central-1c
+            // client.tag.cluster: k8s-cluster2
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // Since we have only two unique `cluster` tag values,
+            // we can only achieve "ideal" distribution on the 1st standby 
task assignment.
+            // Ideal distribution for the 1st standby task can be achieved 
because we can assign standby task
+            // to the client located on different cluster and zone compared to 
an active task.
+            // We can't consider the `cluster` tag for the 2nd standby task 
assignment because the 1st standby
+            // task would already be assigned on different cluster compared to 
the active one, which means
+            // we have already used all the available cluster tag values. 
Taking the `cluster` tag into consideration
+            // for the 2nd standby task assignment would affectively mean 
excluding all the clients.

Review comment:
       ```suggestion
               // for the 2nd standby task assignment would effectively mean 
excluding all the clients.
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.createLeastLoadedPrioritySetConstrainedByAssignedTask;
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks;
+
+/**
+ * Distributes standby tasks over different tag dimensions. Standby task 
distribution is on a best-effort basis.
+ * If rack aware standby task assignment is not possible, implementation fall 
backs to distributing standby tasks on least-loaded clients.
+ *
+ * @see DefaultStandbyTaskAssignor
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+    private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+    /**
+     * The algorithm distributes standby tasks for the {@param 
statefulTaskIds} over different tag dimensions.
+     * For each stateful task, the number of standby tasks will be assigned 
based on configured {@link AssignmentConfigs#numStandbyReplicas}.
+     * Rack aware standby tasks distribution only takes into account tags 
specified via {@link AssignmentConfigs#rackAwareAssignmentTags}.
+     * Ideally, all standby tasks for any given stateful task will be located 
on different tag dimensions to have the best possible distribution.
+     * However, if the ideal (or partially ideal) distribution is impossible, 
the algorithm will fall back to the least-loaded clients without taking rack 
awareness constraints into consideration.
+     * The least-loaded clients are determined based on the total number of 
tasks (active and standby tasks) assigned to the client.
+     */
+    @Override
+    public boolean assign(final Map<UUID, ClientState> clients,
+                          final Set<TaskId> allTaskIds,
+                          final Set<TaskId> statefulTaskIds,
+                          final AssignorConfiguration.AssignmentConfigs 
configs) {
+        final int numStandbyReplicas = configs.numStandbyReplicas;
+        final Set<String> rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+
+        final Map<TaskId, Integer> tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+            numStandbyReplicas,
+            allTaskIds
+        );
+
+        final Map<String, Set<String>> tagKeyToValues = new HashMap<>();
+        final Map<TagEntry, Set<UUID>> tagEntryToClients = new HashMap<>();
+
+        fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues);
+
+        final ConstrainedPrioritySet standbyTaskClientsByTaskLoad = 
createLeastLoadedPrioritySetConstrainedByAssignedTask(clients);
+
+        final Map<TaskId, UUID> pendingStandbyTasksToClientId = new 
HashMap<>();
+
+        for (final TaskId statefulTaskId : statefulTaskIds) {
+            for (final Map.Entry<UUID, ClientState> entry : 
clients.entrySet()) {
+                final UUID clientId = entry.getKey();
+                final ClientState clientState = entry.getValue();
+
+                if (clientState.activeTasks().contains(statefulTaskId)) {
+                    assignStandbyTasksToClientsWithDifferentTags(
+                        standbyTaskClientsByTaskLoad,
+                        statefulTaskId,
+                        clientId,
+                        rackAwareAssignmentTags,
+                        clients,
+                        tasksToRemainingStandbys,
+                        tagKeyToValues,
+                        tagEntryToClients,
+                        pendingStandbyTasksToClientId
+                    );
+                }
+            }
+        }
+
+        if (!tasksToRemainingStandbys.isEmpty()) {
+            log.debug("Rack aware standby task assignment was not able to 
assign all standby tasks. " +
+                      "tasksToRemainingStandbys=[{}], 
pendingStandbyTasksToClientId=[{}]. " +
+                      "Will distribute the remaining standby tasks to least 
loaded clients.",
+                      tasksToRemainingStandbys, pendingStandbyTasksToClientId);
+
+            assignPendingStandbyTasksToLeastLoadedClients(clients,
+                                                          numStandbyReplicas,
+                                                          
rackAwareAssignmentTags,
+                                                          
standbyTaskClientsByTaskLoad,
+                                                          
tasksToRemainingStandbys,
+                                                          
pendingStandbyTasksToClientId);
+        }
+
+        // returning false, because standby task assignment will never require 
a follow-up probing rebalance.
+        return false;
+    }
+
+    private static void assignPendingStandbyTasksToLeastLoadedClients(final 
Map<UUID, ClientState> clients,
+                                                                      final 
int numStandbyReplicas,
+                                                                      final 
Set<String> rackAwareAssignmentTags,
+                                                                      final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+                                                                      final 
Map<TaskId, Integer> pendingStandbyTaskToNumberRemainingStandbys,
+                                                                      final 
Map<TaskId, UUID> pendingStandbyTaskToClientId) {
+        // We need to re offer all the clients to find the least loaded ones
+        standbyTaskClientsByTaskLoad.offerAll(clients.keySet());
+
+        for (final Entry<TaskId, Integer> pendingStandbyTaskAssignmentEntry : 
pendingStandbyTaskToNumberRemainingStandbys.entrySet()) {
+            final TaskId activeTaskId = 
pendingStandbyTaskAssignmentEntry.getKey();
+            final UUID clientId = 
pendingStandbyTaskToClientId.get(activeTaskId);
+
+            final int numberOfRemainingStandbys = 
pollClientAndMaybeAssignAndUpdateRemainingStandbyTasks(
+                clients,
+                pendingStandbyTaskToNumberRemainingStandbys,
+                standbyTaskClientsByTaskLoad,
+                activeTaskId
+            );
+
+            if (numberOfRemainingStandbys > 0) {
+                log.warn("Unable to assign {} of {} standby tasks for task 
[{}] with client tags [{}]. " +
+                         "There is not enough available capacity. You should " 
+
+                         "increase the number of application instances " +
+                         "on different client tag dimensions " +
+                         "to maintain the requested number of standby 
replicas. " +
+                         "Rack awareness is configured with [{}] tags.",
+                         numberOfRemainingStandbys, numStandbyReplicas, 
activeTaskId,
+                         clients.get(clientId).clientTags(), 
rackAwareAssignmentTags);
+            }
+        }
+    }
+
+    @Override
+    public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+        final Map<String, String> sourceClientTags = source.clientTags();
+        final Map<String, String> destinationClientTags = 
destination.clientTags();
+
+        for (final Entry<String, String> sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+            if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey())))
 {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    // Visible for testing
+    static void fillClientsTagStatistics(final Map<UUID, ClientState> 
clientStates,
+                                         final Map<TagEntry, Set<UUID>> 
tagEntryToClients,
+                                         final Map<String, Set<String>> 
tagKeyToValues) {
+        for (final Entry<UUID, ClientState> clientStateEntry : 
clientStates.entrySet()) {
+            final UUID clientId = clientStateEntry.getKey();
+            final ClientState clientState = clientStateEntry.getValue();
+
+            clientState.clientTags().forEach((tagKey, tagValue) -> {
+                tagKeyToValues.computeIfAbsent(tagKey, ignored -> new 
HashSet<>()).add(tagValue);
+                tagEntryToClients.computeIfAbsent(new TagEntry(tagKey, 
tagValue), ignored -> new HashSet<>()).add(clientId);
+            });
+        }
+    }
+
+    // Visible for testing
+    static void assignStandbyTasksToClientsWithDifferentTags(final 
ConstrainedPrioritySet standbyTaskClientsByTaskLoad,
+                                                             final TaskId 
activeTaskId,
+                                                             final UUID 
activeTaskClient,
+                                                             final Set<String> 
rackAwareAssignmentTags,
+                                                             final Map<UUID, 
ClientState> clientStates,
+                                                             final Map<TaskId, 
Integer> tasksToRemainingStandbys,
+                                                             final Map<String, 
Set<String>> tagKeyToValues,
+                                                             final 
Map<TagEntry, Set<UUID>> tagEntryToClients,
+                                                             final Map<TaskId, 
UUID> pendingStandbyTasksToClientId) {
+        standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
+
+        // We set countOfUsedClients as 1 because client where active task is 
located has to be considered as used.
+        int countOfUsedClients = 1;
+        int numRemainingStandbys = tasksToRemainingStandbys.get(activeTaskId);
+
+        final Map<TagEntry, Set<UUID>> tagEntryToUsedClients = new HashMap<>();
+
+        UUID lastUsedClient = activeTaskClient;
+        do {
+            updateClientsOnAlreadyUsedTagEntries(
+                lastUsedClient,
+                countOfUsedClients,
+                rackAwareAssignmentTags,
+                clientStates,
+                tagEntryToClients,
+                tagKeyToValues,
+                tagEntryToUsedClients
+            );
+
+            final UUID clientOnUnusedTagDimensions = 
standbyTaskClientsByTaskLoad.poll(
+                activeTaskId, uuid -> !isClientUsedOnAnyOfTheTagEntries(uuid, 
tagEntryToUsedClients)
+            );
+
+            if (clientOnUnusedTagDimensions == null) {
+                break;
+            }
+
+            
clientStates.get(clientOnUnusedTagDimensions).assignStandby(activeTaskId);
+
+            countOfUsedClients++;
+            numRemainingStandbys--;
+
+            lastUsedClient = clientOnUnusedTagDimensions;
+        } while (numRemainingStandbys > 0);
+
+        if (numRemainingStandbys > 0) {
+            pendingStandbyTasksToClientId.put(activeTaskId, activeTaskClient);
+            tasksToRemainingStandbys.put(activeTaskId, numRemainingStandbys);
+        } else {
+            tasksToRemainingStandbys.remove(activeTaskId);
+        }
+    }
+
+    private static boolean isClientUsedOnAnyOfTheTagEntries(final UUID client,
+                                                            final 
Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
+        return tagEntryToUsedClients.values().stream().anyMatch(usedClients -> 
usedClients.contains(client));
+    }
+
+    private static void updateClientsOnAlreadyUsedTagEntries(final UUID 
usedClient,
+                                                             final int 
countOfUsedClients,
+                                                             final Set<String> 
rackAwareAssignmentTags,
+                                                             final Map<UUID, 
ClientState> clientStates,
+                                                             final 
Map<TagEntry, Set<UUID>> tagEntryToClients,
+                                                             final Map<String, 
Set<String>> tagKeyToValues,
+                                                             final 
Map<TagEntry, Set<UUID>> tagEntryToUsedClients) {
+        final Map<String, String> usedClientTags = 
clientStates.get(usedClient).clientTags();
+
+        for (final Entry<String, String> usedClientTagEntry : 
usedClientTags.entrySet()) {
+            final String tagKey = usedClientTagEntry.getKey();
+
+            if (!rackAwareAssignmentTags.contains(tagKey)) {
+                log.warn("Client tag with key [{}] will be ignored when 
computing rack aware standby " +
+                         "task assignment because it is not part of the 
configured rack awareness [{}].",
+                         tagKey, rackAwareAssignmentTags);
+                continue;
+            }
+
+            final Set<String> allTagValues = tagKeyToValues.get(tagKey);
+
+            // Consider the following client setup where we need to distribute 
2 standby tasks for each stateful task.
+            //
+            // # Kafka Streams Client 1
+            // client.tag.zone: eu-central-1a
+            // client.tag.cluster: k8s-cluster1
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 2
+            // client.tag.zone: eu-central-1b
+            // client.tag.cluster: k8s-cluster1
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 3
+            // client.tag.zone: eu-central-1c
+            // client.tag.cluster: k8s-cluster1
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 4
+            // client.tag.zone: eu-central-1a
+            // client.tag.cluster: k8s-cluster2
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 5
+            // client.tag.zone: eu-central-1b
+            // client.tag.cluster: k8s-cluster2
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // # Kafka Streams Client 6
+            // client.tag.zone: eu-central-1c
+            // client.tag.cluster: k8s-cluster2
+            // rack.aware.assignment.tags: zone,cluster
+            //
+            // Since we have only two unique `cluster` tag values,
+            // we can only achieve "ideal" distribution on the 1st standby 
task assignment.
+            // Ideal distribution for the 1st standby task can be achieved 
because we can assign standby task
+            // to the client located on different cluster and zone compared to 
an active task.
+            // We can't consider the `cluster` tag for the 2nd standby task 
assignment because the 1st standby
+            // task would already be assigned on different cluster compared to 
the active one, which means
+            // we have already used all the available cluster tag values. 
Taking the `cluster` tag into consideration
+            // for the 2nd standby task assignment would affectively mean 
excluding all the clients.
+            // Instead, for the 2nd standby task, we can only achieve partial 
rack awareness based on the `zone` tag.
+            // As we don't consider the `cluster` tag for the 2nd standby task 
assignment, partial rack awareness
+            // can be satisfied by placing the 2nd standby client on a 
different `zone` tag compared to active and corresponding standby tasks.
+            // The `zone` on either `cluster` tags are valid candidates for 
the partial rack awareness, as our goal is to distribute clients on the 
different `zone` tags.
+
+            // This statement checks if we have used more clients than the all 
the unique values for the given tag,

Review comment:
       ```suggestion
               // This statement checks if we have used more clients than the 
number of unique values for the given tag,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to