Copilot commented on code in PR #20486:
URL: https://github.com/apache/kafka/pull/20486#discussion_r2324686649


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -25,6 +25,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Map;

Review Comment:
   The import of LinkedList is added but there's no corresponding documentation 
explaining why LinkedList is preferred over the original Set<TaskId> for 
maintaining insertion order in range-style assignments.
   ```suggestion
   
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -296,9 +302,14 @@ private boolean hasUnfulfilledQuota(final Member member) {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
-    private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+    private void assignStandby(final LinkedList<TaskId> standbyTasks, int 
numStandbyReplicas) {
         final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
-        for (final TaskId task : standbyTasks) {
+
+        // Assuming our current assignment is range-based, we want to sort by 
partition first.
+        
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
+
+        for (Iterator<TaskId> it = standbyTasks.descendingIterator(); 
it.hasNext(); ) {

Review Comment:
   The logic sorts standbyTasks in reverse order and then iterates using 
descendingIterator(), which effectively processes tasks in the original order. 
This double reversal is confusing and could be simplified by removing both the 
.reversed() and using a regular iterator.
   ```suggestion
           
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
   
           for (Iterator<TaskId> it = standbyTasks.iterator(); it.hasNext(); ) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -329,6 +340,10 @@ private void assignStandby(final Set<TaskId> standbyTasks, 
final int numStandbyR
             }
         }
 
+        // To achieve a range-based assignment, sort by subtopology
+        toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x -> 
x.taskId.subtopologyId())
+            .thenComparing(x -> x.taskId.partition()).reversed());

Review Comment:
   The sorting logic for standby assignments uses .reversed() which contradicts 
the goal of range-style assignment. Range-style assignment should process 
subtopologies and partitions in ascending order, not descending order.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java:
##########
@@ -296,9 +302,14 @@ private boolean hasUnfulfilledQuota(final Member member) {
         return 
localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId)
 < localState.tasksPerMember;
     }
 
-    private void assignStandby(final Set<TaskId> standbyTasks, final int 
numStandbyReplicas) {
+    private void assignStandby(final LinkedList<TaskId> standbyTasks, int 
numStandbyReplicas) {
         final ArrayList<StandbyToAssign> toLeastLoaded = new 
ArrayList<>(standbyTasks.size() * numStandbyReplicas);
-        for (final TaskId task : standbyTasks) {
+
+        // Assuming our current assignment is range-based, we want to sort by 
partition first.
+        
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
+
+        for (Iterator<TaskId> it = standbyTasks.descendingIterator(); 
it.hasNext(); ) {

Review Comment:
   The logic sorts standbyTasks in reverse order and then iterates using 
descendingIterator(), which effectively processes tasks in the original order. 
This double reversal is confusing and could be simplified by removing both the 
.reversed() and using a regular iterator.
   ```suggestion
           
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
   
           for (Iterator<TaskId> it = standbyTasks.iterator(); it.hasNext(); ) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to