lucasbru commented on code in PR #20458:
URL: https://github.com/apache/kafka/pull/20458#discussion_r2319150858


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -53,24 +55,19 @@ public GroupAssignment assign(final GroupSpec groupSpec, 
final TopologyDescriber
     }
 
     private GroupAssignment doAssign(final GroupSpec groupSpec, final 
TopologyDescriber topologyDescriber) {
-        //active
-        final Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
+        final Deque<TaskId> activeTasks = taskIds(topologyDescriber, true);
         assignActive(activeTasks);
 
-        //standby
-        final int numStandbyReplicas =
-            groupSpec.assignmentConfigs().isEmpty() ? 0
-                : 
Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
-        if (numStandbyReplicas > 0) {
-            final Set<TaskId> statefulTasks = taskIds(topologyDescriber, 
false);
-            assignStandby(statefulTasks, numStandbyReplicas);
+        if (localState.numStandbyReplicas > 0) {
+            final Deque<TaskId> statefulTasks = taskIds(topologyDescriber, 
false);
+            assignStandby(statefulTasks);
         }
 
         return buildGroupAssignment(groupSpec.members().keySet());
     }
 
-    private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber, 
final boolean isActive) {
-        final Set<TaskId> ret = new HashSet<>();
+    private Deque<TaskId> taskIds(final TopologyDescriber topologyDescriber, 
final boolean isActive) {

Review Comment:
   We want to assign standby tasks in reverse, so I'm using a Deque here, which 
provides a reverseIterator.
   
   The reason why we want to traverse standby tasks in reverse is the example 
that I added in the unit tests of both LegacyStickTaskAssignor and the new 
StickyTaskAssignor.
   
   Assume we have 
   Node 1: Active task 0,1, Standby task 2,3
   Node 2: Active task 2,3, Standby task 0,1
   Node 3: - (new)
   
   Then we don't want to assign active tasks and standby tasks in the same 
order.
   Suppose we try to assign active tasks in increasing order, we will get:
   
   Node 1: Active task 0,1
   Node 2: Active task 2
   Node 3: Active task 3
   
   Since task 3 is the last task we will assign, and at that point, the quota 
for active tasks is 1, so it can only be assigned to Node 3.
   
   Suppose now we assign standby tasks in the same order, we will get this:
   
   Node 1: Active task 0,1, Standby task 2, 3
   Node 2: Active task 2, Standby task 0, 1
   Node 3: Active task 3
   
   The reason is that we first assign tasks 0,1,2, which all can be assigned to 
the previous member that owned it. Finally, we want to assign standby task 3, 
but it cannot be assigned to Node 3, so we have to assign it to Node 1 or Node 
2. Using reverse order means, when I have new nodes, they will get the 
numerically last few active tasks, and the numerically first standby tasks, 
which should avoid this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to