ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r411734468



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -89,95 +88,72 @@ public boolean assign() {
             return false;
         }
 
-        final Map<UUID, List<TaskId>> warmupTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-        final Map<UUID, List<TaskId>> standbyTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
-        final Map<UUID, List<TaskId>> statelessActiveTaskAssignment = 
initializeEmptyTaskAssignmentMap(sortedClients);
+        final Map<TaskId, Integer> tasksToRemainingStandbys =
+            statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
 
-        // ---------------- Stateful Active Tasks ---------------- //
+        final boolean followupRebalanceNeeded = 
assignStatefulActiveTasks(tasksToRemainingStandbys);
 
-        final Map<UUID, List<TaskId>> statefulActiveTaskAssignment =
-            new DefaultStateConstrainedBalancedAssignor().assign(
-                statefulTasksToRankedCandidates,
-                configs.balanceFactor,
-                sortedClients,
-                clientsToNumberOfThreads,
-                tasksToCaughtUpClients
-            );
+        assignStandbyReplicaTasks(tasksToRemainingStandbys);
+
+        assignStatelessActiveTasks();
 
-        // ---------------- Warmup Replica Tasks ---------------- //
+        return followupRebalanceNeeded;
+    }
 
-        final Map<UUID, List<TaskId>> balancedStatefulActiveTaskAssignment =
+    private boolean assignStatefulActiveTasks(final Map<TaskId, Integer> 
tasksToRemainingStandbys) {
+        final Map<UUID, List<TaskId>> statefulActiveTaskAssignment =
             new DefaultBalancedAssignor().assign(
                 sortedClients,
                 statefulTasks,
                 clientsToNumberOfThreads,
                 configs.balanceFactor);
 
-        final Map<TaskId, Integer> tasksToRemainingStandbys =
-            statefulTasks.stream().collect(Collectors.toMap(task -> task, t -> 
configs.numStandbyReplicas));
-
-        final List<TaskMovement> movements = getMovements(
+        return assignTaskMovements(
             statefulActiveTaskAssignment,
-            balancedStatefulActiveTaskAssignment,
             tasksToCaughtUpClients,
             clientStates,
             tasksToRemainingStandbys,
-            configs.maxWarmupReplicas);
-
-        for (final TaskMovement movement : movements) {
-            warmupTaskAssignment.get(movement.destination).add(movement.task);
-        }
-
-        // ---------------- Standby Replica Tasks ---------------- //
-
-        final List<Map<UUID, List<TaskId>>> allTaskAssignmentMaps = asList(
-            statefulActiveTaskAssignment,
-            warmupTaskAssignment,
-            standbyTaskAssignment,
-            statelessActiveTaskAssignment
+            configs.maxWarmupReplicas
         );
+    }
 
-        final ValidClientsByTaskLoadQueue<UUID> clientsByStandbyTaskLoad =
-            new ValidClientsByTaskLoadQueue<>(
-                getClientPriorityQueueByTaskLoad(allTaskAssignmentMaps),
-                allTaskAssignmentMaps
+    private void assignStandbyReplicaTasks(final Map<TaskId, Integer> 
tasksToRemainingStandbys) {
+        final ValidClientsByTaskLoadQueue standbyTaskClientsByTaskLoad =
+            new ValidClientsByTaskLoadQueue(
+                clientStates,
+                (client, task) -> 
!clientStates.get(client).assignedTasks().contains(task)
             );
+        standbyTaskClientsByTaskLoad.offerAll(clientStates.keySet());
 
         for (final TaskId task : statefulTasksToRankedCandidates.keySet()) {
             final int numRemainingStandbys = 
tasksToRemainingStandbys.get(task);
-            final List<UUID> clients = clientsByStandbyTaskLoad.poll(task, 
numRemainingStandbys);
+            final List<UUID> clients = standbyTaskClientsByTaskLoad.poll(task, 
numRemainingStandbys);
             for (final UUID client : clients) {
-                standbyTaskAssignment.get(client).add(task);
+                clientStates.get(client).assignStandby(task);
             }
-            clientsByStandbyTaskLoad.offer(clients);
+            standbyTaskClientsByTaskLoad.offerAll(clients);
+
             final int numStandbysAssigned = clients.size();
-            if (numStandbysAssigned < configs.numStandbyReplicas) {
+            if (numStandbysAssigned < numRemainingStandbys) {
                 log.warn("Unable to assign {} of {} standby tasks for task 
[{}]. " +
                              "There is not enough available capacity. You 
should " +
                              "increase the number of threads and/or 
application instances " +
                              "to maintain the requested number of standby 
replicas.",
-                    configs.numStandbyReplicas - numStandbysAssigned, 
configs.numStandbyReplicas, task);
+                         numRemainingStandbys - numStandbysAssigned, 
configs.numStandbyReplicas, task);
             }
         }
+    }
 
-        // ---------------- Stateless Active Tasks ---------------- //
-
-        final PriorityQueue<UUID> statelessActiveTaskClientsQueue = 
getClientPriorityQueueByTaskLoad(allTaskAssignmentMaps);
+    private void assignStatelessActiveTasks() {
+        final PriorityQueue<UUID> statelessActiveTaskClientsQueue = 
getClientPriorityQueueByTaskLoad(clientStates);

Review comment:
       Definitely




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to