lucasbru commented on code in PR #20730:
URL: https://github.com/apache/kafka/pull/20730#discussion_r2447299013


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -368,31 +374,103 @@ private StreamsGroupMember computeNextAssignment(int 
memberEpoch,
                         .contains(member.processId())
         );
 
-        return buildNewMember(
-            memberEpoch,
-            new TasksTuple(
+        // Add epochs to the computed task tuples
+        // Preserve previous epochs for tasks that were already assigned or 
pending revocation,
+        // and use the target assignment epoch for newly assigned tasks.
+        TasksTupleWithEpochs newTasksPendingRevocationWithEpochs = new 
TasksTupleWithEpochs(
+            addEpochsToTasks(
                 newActiveTasksPendingRevocation,

Review Comment:
   If the old assignment was {A, B}, and the member owns {A, B, C}, the old 
pending tasks to revoke was {C}.
   
   If the new assignment is now {B, C}, and the member owns {A, B, C}, the new 
pending tasks to revoke is {A}.
   
   So in the new assignment, we may have a different set of tasks to revoke, so 
this is not a new task at all, but a new task pending revocation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to