cadonna commented on a change in pull request #11760:
URL: https://github.com/apache/kafka/pull/11760#discussion_r811184710



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -102,17 +109,26 @@ static int assignActiveTaskMovements(final Map<TaskId, 
SortedSet<UUID>> tasksToC
         final int movementsNeeded = taskMovements.size();
 
         for (final TaskMovement movement : taskMovements) {
-            final UUID standbySourceClient = caughtUpClientsByTaskLoad.poll(
+            // Attempt to find a caught up standby, otherwise find any caught 
up client, failing that use the most
+            // caught up client.
+            UUID sourceClient = caughtUpClientsByTaskLoad.poll(
                 movement.task,
                 c -> clientStates.get(c).hasStandbyTask(movement.task)
             );
-            if (standbySourceClient == null) {
-                // there's not a caught-up standby available to take over the 
task, so we'll schedule a warmup instead
-                final UUID sourceClient = requireNonNull(
-                    caughtUpClientsByTaskLoad.poll(movement.task),
-                    "Tried to move task to caught-up client but none exist"
+
+            if (sourceClient == null) {
+                sourceClient = caughtUpClientsByTaskLoad.poll(movement.task);
+            }
+
+            if (sourceClient == null) {
+                sourceClient = requireNonNull(
+                        mostCaughtUpEligibleClient(tasksToClientByLag, 
movement.task, movement.destination),
+                        "Tried to move task to more caught-up client but none 
exist"
                 );
+            }
 
+            if (!clientStates.get(sourceClient).hasStandbyTask(movement.task)) 
{
+                // there's not a standby available to take over the task, so 
we'll schedule a warmup instead

Review comment:
       I think it is fine as you did. However, the method is already over 100 
lines long. Maybe the code would benefit from a bit of refactoring. The method 
was already quite long before you added your part. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to