lucasbru commented on code in PR #20730:
URL: https://github.com/apache/kafka/pull/20730#discussion_r2494215919


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -304,6 +304,101 @@ private static boolean 
computeAssignmentDifferenceForOneSubtopology(final String
         return hasUnreleasedTasks;
     }
 
+    /**
+     * Takes the current currentAssignment and the targetAssignment, and 
generates three
+     * collections:
+     *
+     * - the resultAssignedTasks: the tasks that are assigned in both the 
current and target
+     * assignments.
+     * - the resultTasksPendingRevocation: the tasks that are assigned in the 
current
+     * assignment but not in the target assignment.
+     * - the resultTasksPendingAssignment: the tasks that are assigned in the 
target assignment but
+     * not in the current assignment, and can be assigned currently (i.e., 
they are not owned by
+     * another member, as defined by the `isUnreleasedTask` predicate).
+     *
+     * Epoch Handling:
+     * - For tasks in resultAssignedTasks and resultTasksPendingRevocation, 
the epoch from currentAssignment is preserved.
+     * - For tasks in resultTasksPendingAssignment, the targetAssignmentEpoch 
is used.
+     */
+    private boolean computeAssignmentDifferenceWithEpoch(Map<String, 
Map<Integer, Integer>> currentAssignment,
+                                                         Map<String, 
Set<Integer>> targetAssignment,
+                                                         int 
targetAssignmentEpoch,
+                                                         Map<String, 
Map<Integer, Integer>> resultAssignedTasks,
+                                                         Map<String, 
Map<Integer, Integer>> resultTasksPendingRevocation,
+                                                         Map<String, 
Map<Integer, Integer>> resultTasksPendingAssignment,
+                                                         BiPredicate<String, 
Integer> isUnreleasedTask) {
+        boolean hasUnreleasedTasks = false;
+
+        Set<String> allSubtopologyIds = new 
HashSet<>(targetAssignment.keySet());
+        allSubtopologyIds.addAll(currentAssignment.keySet());
+
+        for (String subtopologyId : allSubtopologyIds) {
+            hasUnreleasedTasks |= 
computeAssignmentDifferenceForOneSubtopologyWithEpoch(
+                subtopologyId,
+                currentAssignment.getOrDefault(subtopologyId, Map.of()),
+                targetAssignment.getOrDefault(subtopologyId, Set.of()),
+                targetAssignmentEpoch,
+                resultAssignedTasks,
+                resultTasksPendingRevocation,
+                resultTasksPendingAssignment,
+                isUnreleasedTask
+            );
+        }
+        return hasUnreleasedTasks;
+    }
+
+    private static boolean 
computeAssignmentDifferenceForOneSubtopologyWithEpoch(final String 
subtopologyId,
+                                                                               
  final Map<Integer, Integer> currentTasksForThisSubtopology,
+                                                                               
  final Set<Integer> targetTasksForThisSubtopology,
+                                                                               
  final int targetAssignmentEpoch,
+                                                                               
  final Map<String, Map<Integer, Integer>> resultAssignedTasks,
+                                                                               
  final Map<String, Map<Integer, Integer>> resultTasksPendingRevocation,
+                                                                               
  final Map<String, Map<Integer, Integer>> resultTasksPendingAssignment,
+                                                                               
  final BiPredicate<String, Integer> isUnreleasedTask) {
+        // Result Assigned Tasks = Current Tasks ∩ Target Tasks
+        // i.e. we remove all tasks from the current assignment that are not 
in the target
+        //         assignment
+        Map<Integer, Integer> resultAssignedTasksForThisSubtopology = new 
HashMap<>();
+        for (Map.Entry<Integer, Integer> entry : 
currentTasksForThisSubtopology.entrySet()) {
+            if (targetTasksForThisSubtopology.contains(entry.getKey())) {
+                resultAssignedTasksForThisSubtopology.put(entry.getKey(), 
entry.getValue());
+            }
+        }
+
+        // Result Tasks Pending Revocation = Current Tasks - Result Assigned 
Tasks
+        // i.e. we will ask the member to revoke all tasks in its current 
assignment that
+        //      are not in the target assignment
+        Map<Integer, Integer> resultTasksPendingRevocationForThisSubtopology = 
new HashMap<>(currentTasksForThisSubtopology);
+        
resultTasksPendingRevocationForThisSubtopology.keySet().removeAll(resultAssignedTasksForThisSubtopology.keySet());
+
+        // Result Tasks Pending Assignment = Target Tasks - Result Assigned 
Tasks - Unreleased Tasks
+        // i.e. we will ask the member to assign all tasks in its target 
assignment,
+        //      except those that are already assigned, and those that are 
unreleased
+        Map<Integer, Integer> resultTasksPendingAssignmentForThisSubtopology = 
new HashMap<>();
+        for (Integer taskId : targetTasksForThisSubtopology) {
+            if (!resultAssignedTasksForThisSubtopology.containsKey(taskId)) {
+                resultTasksPendingAssignmentForThisSubtopology.put(taskId, 
targetAssignmentEpoch);
+            }
+        }
+        boolean hasUnreleasedTasks = 
resultTasksPendingAssignmentForThisSubtopology.keySet().removeIf(taskId ->
+            isUnreleasedTask.test(subtopologyId, taskId)

Review Comment:
   Pending assignment here means "ready to be assigned". They will actually be 
assigned to the member once the current assignment is built. To compute it, we 
take all tasks that are assigned to us in the target assignment, minus the 
tasks that are already assigned to us. The resulting set can contain released 
tasks - for examples, tasks that were never assigned to a different member 
before, or tasks that were already revoked by a different member. The resulting 
set can also contain unreleased tasks - if another member did not revoke it 
yet. The remaining tasks are "ready to be assigned".
   
   Maybe "ready to be assigned" would be a better name but this is consistent 
with consumer groups. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to