mjsax commented on code in PR #20730:
URL: https://github.com/apache/kafka/pull/20730#discussion_r2492603352


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -304,6 +304,101 @@ private static boolean 
computeAssignmentDifferenceForOneSubtopology(final String
         return hasUnreleasedTasks;
     }
 
+    /**
+     * Takes the current currentAssignment and the targetAssignment, and 
generates three
+     * collections:
+     *
+     * - the resultAssignedTasks: the tasks that are assigned in both the 
current and target
+     * assignments.
+     * - the resultTasksPendingRevocation: the tasks that are assigned in the 
current
+     * assignment but not in the target assignment.
+     * - the resultTasksPendingAssignment: the tasks that are assigned in the 
target assignment but
+     * not in the current assignment, and can be assigned currently (i.e., 
they are not owned by
+     * another member, as defined by the `isUnreleasedTask` predicate).
+     *
+     * Epoch Handling:
+     * - For tasks in resultAssignedTasks and resultTasksPendingRevocation, 
the epoch from currentAssignment is preserved.
+     * - For tasks in resultTasksPendingAssignment, the targetAssignmentEpoch 
is used.
+     */
+    private boolean computeAssignmentDifferenceWithEpoch(Map<String, 
Map<Integer, Integer>> currentAssignment,
+                                                         Map<String, 
Set<Integer>> targetAssignment,
+                                                         int 
targetAssignmentEpoch,
+                                                         Map<String, 
Map<Integer, Integer>> resultAssignedTasks,
+                                                         Map<String, 
Map<Integer, Integer>> resultTasksPendingRevocation,
+                                                         Map<String, 
Map<Integer, Integer>> resultTasksPendingAssignment,
+                                                         BiPredicate<String, 
Integer> isUnreleasedTask) {
+        boolean hasUnreleasedTasks = false;
+
+        Set<String> allSubtopologyIds = new 
HashSet<>(targetAssignment.keySet());
+        allSubtopologyIds.addAll(currentAssignment.keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            hasUnreleasedTasks |= 
computeAssignmentDifferenceForOneSubtopologyWithEpoch(
+                subtopologyId,
+                currentAssignment.getOrDefault(subtopologyId, Map.of()),
+                targetAssignment.getOrDefault(subtopologyId, Set.of()),
+                targetAssignmentEpoch,
+                resultAssignedTasks,
+                resultTasksPendingRevocation,
+                resultTasksPendingAssignment,
+                isUnreleasedTask
+            );

Review Comment:
   Should be break the loop early as soon as `hasUnreleasedTasks` becomes 
`true` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to