lucasbru commented on code in PR #20458: URL: https://github.com/apache/kafka/pull/20458#discussion_r2319150858
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java: ########## @@ -53,24 +55,19 @@ public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber } private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { - //active - final Set<TaskId> activeTasks = taskIds(topologyDescriber, true); + final Deque<TaskId> activeTasks = taskIds(topologyDescriber, true); assignActive(activeTasks); - //standby - final int numStandbyReplicas = - groupSpec.assignmentConfigs().isEmpty() ? 0 - : Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas")); - if (numStandbyReplicas > 0) { - final Set<TaskId> statefulTasks = taskIds(topologyDescriber, false); - assignStandby(statefulTasks, numStandbyReplicas); + if (localState.numStandbyReplicas > 0) { + final Deque<TaskId> statefulTasks = taskIds(topologyDescriber, false); + assignStandby(statefulTasks); } return buildGroupAssignment(groupSpec.members().keySet()); } - private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) { - final Set<TaskId> ret = new HashSet<>(); + private Deque<TaskId> taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) { Review Comment: We want to assign standby tasks in reverse, so I'm using a Deque here, which provides a reverseIterator. The reason why we want to traverse standby tasks in reverse is the example that I added in the unit tests of both LegacyStickTaskAssignor and the new StickyTaskAssignor. Assume we have Node 1: Active task 0,1, Standby task 2,3 Node 2: Active task 2,3, Standby task 0,1 Node 3: - (new) Then we don't want to assign active tasks and standby tasks in the same order. Suppose we try to assign active tasks in increasing order, we will get: Node 1: Active task 0,1 Node 2: Active task 2 Node 3: Active task 3 Since task 3 is the last task we will assign, and at that point, the quota for active tasks is 1, so it can only be assigned to Node 3. Suppose now we assign standby tasks in the same order, we will get this: Node 1: Active task 0,1, Standby task 2, 3 Node 2: Active task 2, Standby task 0, 1 Node 3: Active task 3 The reason is that we first assign tasks 0,1,2, which all can be assigned to the previous member that owned it. Finally, we want to assign standby task 3, but it cannot be assigned to Node 3, so we have to assign it to Node 1 or Node 2. Using reverse order means, when I have new nodes, they will get the numerically last few active tasks, and the numerically first standby tasks, which should avoid this problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org