squah-confluent commented on code in PR #17385:
URL: https://github.com/apache/kafka/pull/17385#discussion_r1799116772


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -75,79 +84,153 @@ public class UniformHeterogeneousAssignmentBuilder {
     private final Set<Uuid> subscribedTopicIds;
 
     /**
-     * List of subscribed members for each topic.
+     * The list of members in the consumer group, sorted by member id.
      */
-    private final Map<Uuid, List<String>> membersPerTopic;
+    private final List<String> memberIds;
 
     /**
-     * The new assignment that will be returned.
+     * Maps member ids to their indices in the memberIds list.
      */
-    private final Map<String, MemberAssignment> targetAssignment;
+    private final Map<String, Integer> memberIndices;
 
     /**
-     * The partitions that still need to be assigned.
+     * List of subscribed members for each topic, in ascending order.
+     * <p/>
+     * Members are stored as integer indices into the memberIds array.
      */
-    private final Set<TopicIdPartition> unassignedPartitions;
+    private final Map<Uuid, List<Integer>> topicSubscribers;
 
     /**
-     * All the partitions that have been retained from the existing assignment.
+     * The new assignment that will be returned.
+     */
+    private final Map<String, MemberAssignment> targetAssignment;
+
+    /**
+     * The number of partitions each member is assigned in the new assignment.
+     * <p/>
+     * The indices are the same as the memberIds array. 
memberTargetAssignmentSizes[i] contains the
+     * number of partitions assigned to memberIds[i].
+     * <p/>
+     * We use a plain int[] array here for performance reasons. Looking up the 
number of partitions
+     * assigned to a given member is a very common operation. By using a plain 
int[] array, we can
+     * avoid:
+     * <ul>
+     *   <li>the overhead of boxing and unboxing integers.</li>
+     *   <li>the cost of HashMap lookups.</li>
+     *   <li>the cost of checking member ids for equality when there are 
HashMap collisions.</li>
+     * </ul>
      */
-    private final Set<TopicIdPartition> assignedStickyPartitions;
+    private final int[] memberTargetAssignmentSizes;
 
     /**
-     * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+     * The partitions that still need to be assigned.
      */
-    private final AssignmentManager assignmentManager;
+    private final Map<Uuid, List<Integer>> unassignedPartitions;
 
     /**
-     * List of all the members sorted by their respective assignment sizes.
+     * Orders members by their number of assigned partitions, ascending.
+     * <p/>
+     * Ties are broken by member index, ascending.
      */
-    private final TreeSet<String> sortedMembersByAssignmentSize;
+    private final Comparator<Integer> memberComparator;
 
     /**
-     * Tracks the owner of each partition in the target assignment.
+     * Tracks the owner of each partition in the previous assignment.
+     * <p/>
+     * Only assignments which would continue to be valid for the current set 
of members and
+     * subscriptions are present.
+     * <p/>
+     * Owners are represented as indices into the memberIds array.
+     * -1 indicates an unassigned partition, a previous owner that is no 
longer subscribed to the
+     * topic or a previous owner that is no longer part of the consumer group.
+     * <p/>
+     * Used for maximizing stickiness.
      */
-    private final Map<TopicIdPartition, String> 
partitionOwnerInTargetAssignment;
+    private final Map<Uuid, int[]> previousAssignmentPartitionOwners;
 
     /**
-     * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+     * Tracks the owner of each partition in the target assignment.
+     * <p/>
+     * Owners are represented as indices into the memberIds array.
+     * -1 indicates an unassigned partition.
      */
-    private final PartitionMovements partitionMovements;
+    private final Map<Uuid, int[]> targetAssignmentPartitionOwners;
 
     public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
         this.groupSpec = groupSpec;
         this.subscribedTopicDescriber = subscribedTopicDescriber;
         this.subscribedTopicIds = new HashSet<>();
-        this.membersPerTopic = new HashMap<>();
+
+        // Number the members 0 to M - 1.
+        this.memberIds = new ArrayList<>(groupSpec.memberIds());
+        this.memberIds.sort(null);

Review Comment:
   It's entirely for determinism. 



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -75,79 +84,153 @@ public class UniformHeterogeneousAssignmentBuilder {
     private final Set<Uuid> subscribedTopicIds;
 
     /**
-     * List of subscribed members for each topic.
+     * The list of members in the consumer group, sorted by member id.
      */
-    private final Map<Uuid, List<String>> membersPerTopic;
+    private final List<String> memberIds;
 
     /**
-     * The new assignment that will be returned.
+     * Maps member ids to their indices in the memberIds list.
      */
-    private final Map<String, MemberAssignment> targetAssignment;
+    private final Map<String, Integer> memberIndices;
 
     /**
-     * The partitions that still need to be assigned.
+     * List of subscribed members for each topic, in ascending order.
+     * <p/>
+     * Members are stored as integer indices into the memberIds array.
      */
-    private final Set<TopicIdPartition> unassignedPartitions;
+    private final Map<Uuid, List<Integer>> topicSubscribers;
 
     /**
-     * All the partitions that have been retained from the existing assignment.
+     * The new assignment that will be returned.
+     */
+    private final Map<String, MemberAssignment> targetAssignment;
+
+    /**
+     * The number of partitions each member is assigned in the new assignment.
+     * <p/>
+     * The indices are the same as the memberIds array. 
memberTargetAssignmentSizes[i] contains the
+     * number of partitions assigned to memberIds[i].
+     * <p/>
+     * We use a plain int[] array here for performance reasons. Looking up the 
number of partitions
+     * assigned to a given member is a very common operation. By using a plain 
int[] array, we can
+     * avoid:
+     * <ul>
+     *   <li>the overhead of boxing and unboxing integers.</li>
+     *   <li>the cost of HashMap lookups.</li>
+     *   <li>the cost of checking member ids for equality when there are 
HashMap collisions.</li>
+     * </ul>
      */
-    private final Set<TopicIdPartition> assignedStickyPartitions;
+    private final int[] memberTargetAssignmentSizes;
 
     /**
-     * Manages assignments to members based on their current assignment size 
and maximum allowed assignment size.
+     * The partitions that still need to be assigned.
      */
-    private final AssignmentManager assignmentManager;
+    private final Map<Uuid, List<Integer>> unassignedPartitions;
 
     /**
-     * List of all the members sorted by their respective assignment sizes.
+     * Orders members by their number of assigned partitions, ascending.
+     * <p/>
+     * Ties are broken by member index, ascending.
      */
-    private final TreeSet<String> sortedMembersByAssignmentSize;
+    private final Comparator<Integer> memberComparator;
 
     /**
-     * Tracks the owner of each partition in the target assignment.
+     * Tracks the owner of each partition in the previous assignment.
+     * <p/>
+     * Only assignments which would continue to be valid for the current set 
of members and
+     * subscriptions are present.
+     * <p/>
+     * Owners are represented as indices into the memberIds array.
+     * -1 indicates an unassigned partition, a previous owner that is no 
longer subscribed to the
+     * topic or a previous owner that is no longer part of the consumer group.
+     * <p/>
+     * Used for maximizing stickiness.
      */
-    private final Map<TopicIdPartition, String> 
partitionOwnerInTargetAssignment;
+    private final Map<Uuid, int[]> previousAssignmentPartitionOwners;
 
     /**
-     * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+     * Tracks the owner of each partition in the target assignment.
+     * <p/>
+     * Owners are represented as indices into the memberIds array.
+     * -1 indicates an unassigned partition.
      */
-    private final PartitionMovements partitionMovements;
+    private final Map<Uuid, int[]> targetAssignmentPartitionOwners;
 
     public UniformHeterogeneousAssignmentBuilder(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
         this.groupSpec = groupSpec;
         this.subscribedTopicDescriber = subscribedTopicDescriber;
         this.subscribedTopicIds = new HashSet<>();
-        this.membersPerTopic = new HashMap<>();
+
+        // Number the members 0 to M - 1.
+        this.memberIds = new ArrayList<>(groupSpec.memberIds());
+        this.memberIds.sort(null);
+        this.memberIndices = new HashMap<>();
+        for (int memberIndex = 0; memberIndex < this.memberIds.size(); 
memberIndex++) {
+            memberIndices.put(memberIds.get(memberIndex), memberIndex);
+        }
+
+        this.topicSubscribers = new HashMap<>();
+
         this.targetAssignment = new HashMap<>();
-        groupSpec.memberIds().forEach(memberId -> {
-            
groupSpec.memberSubscription(memberId).subscribedTopicIds().forEach(topicId -> {
+        this.memberTargetAssignmentSizes = new int[this.memberIds.size()];
+
+        // Build set of all subscribed topics and sets of subscribers per 
topic.
+        for (int memberIndex = 0; memberIndex < this.memberIds.size(); 
memberIndex++) {
+            String memberId = this.memberIds.get(memberIndex);
+            for (Uuid topicId : 
groupSpec.memberSubscription(memberId).subscribedTopicIds()) {
                 // Check if the subscribed topic exists.
-                int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
-                if (partitionCount == -1) {
+                int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                if (numPartitions == -1) {
                     throw new PartitionAssignorException(
                         "Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
                     );
                 }
                 subscribedTopicIds.add(topicId);
-                membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId);
-            });
-            targetAssignment.put(memberId, new MemberAssignmentImpl(new 
HashMap<>()));
-        });
-        this.unassignedPartitions = topicIdPartitions(subscribedTopicIds, 
subscribedTopicDescriber);
-        this.assignedStickyPartitions = new HashSet<>();
-        this.assignmentManager = new 
AssignmentManager(this.subscribedTopicDescriber);
-        this.sortedMembersByAssignmentSize = 
assignmentManager.sortMembersByAssignmentSize(groupSpec.memberIds());
-        this.partitionOwnerInTargetAssignment = new HashMap<>();
-        this.partitionMovements = new PartitionMovements();
+                topicSubscribers.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberIndex);
+            }
+        }
+        this.unassignedPartitions = new HashMap<>();
+
+        this.memberComparator = new Comparator<Integer>() {
+            @Override
+            public int compare(final Integer memberIndex1, final Integer 
memberIndex2) {
+                // Order by number of assigned partitions, ascending.
+                int order = Integer.compare(
+                    memberTargetAssignmentSizes[memberIndex1],
+                    memberTargetAssignmentSizes[memberIndex2]
+                );
+
+                // Then order by member index, ascending. This is equivalent 
to ordering by
+                // member id, ascending, since memberIds is sorted.
+                if (order == 0) {
+                    order = memberIndex1.compareTo(memberIndex2);
+                }
+
+                return order;
+            }
+        };
+
+        // Initialize partition owners for the previous and target assignments.
+        this.previousAssignmentPartitionOwners = new HashMap<>((int) 
((this.subscribedTopicIds.size() / 0.75f) + 1));

Review Comment:
   `previousAssignmentPartitionOwners` is only used during the iterative 
rebalance phase to restore partitions to their previous owners if it helps 
balance. To use `GroupSpec`, we'd need to update the method or add a new method 
to return the assigned memberId. Then compared to the current approach, we'd 
end up with an extra Uuid map lookup and an extra memberId map lookup per 
query. I think there would be a performance cost to this, but I have not tested 
it.
   
   I'm actually tempted to remove the usage of this entirely. See comments on 
`balanceTopicRestoringStickiness`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -157,716 +240,779 @@ public GroupAssignment build() {
 
         assignStickyPartitions();
 
-        unassignedPartitionsAssignment();
+        computeUnassignedPartitions(unassignedPartitions);
+        unassignedPartitionsAssignment(unassignedPartitions);
 
         balance();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * <li> TopicIdPartitions are sorted in descending order based on the 
value:
-     *       totalPartitions/number of subscribed members. </li>
-     * <li> If the above value is the same then topicIdPartitions are sorted in
-     *      ascending order of number of subscribers. </li>
-     * <li> If both criteria are the same, sort in ascending order of the 
partition Id.
-     *      This last criteria is for predictability of the assignments. </li>
+     * Sorts topics based on the following criteria:
+     * <ol>
+     *   <li>Topics are sorted in descending order of totalPartitions / number 
of subscribers
+     *       (partitions per subscriber).</li>
+     *   <li>Ties are broken by sorting in ascending order of number of 
subscribers.</li>
+     *   <li>Any remaining ties are broken by sorting in ascending order of 
topic id.</li>
+     * </ol>
+     *
+     * The last criteria is for predictability of assignments.
      *
-     * @param topicIdPartitions       The topic partitions that need to be 
sorted.
-     * @return A list of sorted topic partitions.
+     * @param topicIds          The topic ids that need to be sorted.
+     * @return A list of sorted topic ids.
      */
-    private List<TopicIdPartition> 
sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) {
-        Comparator<TopicIdPartition> comparator = Comparator
-            .comparingDouble((TopicIdPartition topicIdPartition) -> {
-                int totalPartitions = 
subscribedTopicDescriber.numPartitions(topicIdPartition.topicId());
-                int totalSubscribers = 
membersPerTopic.get(topicIdPartition.topicId()).size();
-                return (double) totalPartitions / totalSubscribers;
-            })
-            .reversed()
-            .thenComparingInt(topicIdPartition -> 
membersPerTopic.get(topicIdPartition.topicId()).size())
-            .thenComparingInt(TopicIdPartition::partitionId);
+    private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) {
+        Comparator<Uuid> comparator = new Comparator<Uuid>() {
+            @Override
+            public int compare(final Uuid topic1Id, final Uuid topic2Id) {
+                int topic1PartitionCount = 
subscribedTopicDescriber.numPartitions(topic1Id);
+                int topic2PartitionCount = 
subscribedTopicDescriber.numPartitions(topic2Id);
+                int topic1SubscriberCount = 
topicSubscribers.get(topic1Id).size();
+                int topic2SubscriberCount = 
topicSubscribers.get(topic2Id).size();
+
+                // Order by partitions per subscriber, descending.
+                int order = Double.compare(
+                    (double) topic2PartitionCount / topic2SubscriberCount,
+                    (double) topic1PartitionCount / topic1SubscriberCount
+                );
+
+                // Then order by subscriber count, ascending.
+                if (order == 0) {
+                    order = Integer.compare(topic1SubscriberCount, 
topic2SubscriberCount);
+                }
 
-        return topicIdPartitions.stream()
-            .sorted(comparator)
-            .collect(Collectors.toList());
+                // Then order by topic id, ascending.
+                if (order == 0) {
+                    order = topic1Id.compareTo(topic2Id);
+                }
+
+                return order;
+            }
+        };
+
+        List<Uuid> sortedTopicIds = new ArrayList<>(topicIds);
+        sortedTopicIds.sort(comparator);
+        return sortedTopicIds;
     }
 
     /**
-     * Gets a set of partitions that are to be retained from the existing 
assignment. This includes:
-     * <li> Partitions from topics that are still present in both the new 
subscriptions and the topic metadata. </li>
+     * Imports the existing assignment into the target assignment, taking into 
account topic
+     * unsubscriptions.
      */
     private void assignStickyPartitions() {
-        groupSpec.memberIds().forEach(memberId ->
-            
groupSpec.memberAssignment(memberId).partitions().forEach((topicId, 
currentAssignment) -> {
+        for (String memberId : groupSpec.memberIds()) {
+            int memberIndex = memberIndices.get(memberId);
+
+            Map<Uuid, Set<Integer>> oldAssignment = 
groupSpec.memberAssignment(memberId).partitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            for (Map.Entry<Uuid, Set<Integer>> entry : 
oldAssignment.entrySet()) {
+                Uuid topicId = entry.getKey();
+                Set<Integer> partitions = entry.getValue();
+
                 if 
(groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) 
{
-                    currentAssignment.forEach(partition -> {
-                        TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
-                        
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
-                        assignedStickyPartitions.add(topicIdPartition);
-                    });
+                    memberTargetAssignmentSizes[memberIndex] += 
partitions.size();
+
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    for (int partition : partitions) {
+                        if (partition >= numPartitions) {

Review Comment:
   I think we should remove the check for consistency with the homogeneous case.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformHeterogeneousAssignmentBuilder.java:
##########
@@ -157,716 +240,779 @@ public GroupAssignment build() {
 
         assignStickyPartitions();
 
-        unassignedPartitionsAssignment();
+        computeUnassignedPartitions(unassignedPartitions);
+        unassignedPartitionsAssignment(unassignedPartitions);
 
         balance();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * <li> TopicIdPartitions are sorted in descending order based on the 
value:
-     *       totalPartitions/number of subscribed members. </li>
-     * <li> If the above value is the same then topicIdPartitions are sorted in
-     *      ascending order of number of subscribers. </li>
-     * <li> If both criteria are the same, sort in ascending order of the 
partition Id.
-     *      This last criteria is for predictability of the assignments. </li>
+     * Sorts topics based on the following criteria:
+     * <ol>
+     *   <li>Topics are sorted in descending order of totalPartitions / number 
of subscribers
+     *       (partitions per subscriber).</li>
+     *   <li>Ties are broken by sorting in ascending order of number of 
subscribers.</li>
+     *   <li>Any remaining ties are broken by sorting in ascending order of 
topic id.</li>
+     * </ol>
+     *
+     * The last criteria is for predictability of assignments.
      *
-     * @param topicIdPartitions       The topic partitions that need to be 
sorted.
-     * @return A list of sorted topic partitions.
+     * @param topicIds          The topic ids that need to be sorted.
+     * @return A list of sorted topic ids.
      */
-    private List<TopicIdPartition> 
sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) {
-        Comparator<TopicIdPartition> comparator = Comparator
-            .comparingDouble((TopicIdPartition topicIdPartition) -> {
-                int totalPartitions = 
subscribedTopicDescriber.numPartitions(topicIdPartition.topicId());
-                int totalSubscribers = 
membersPerTopic.get(topicIdPartition.topicId()).size();
-                return (double) totalPartitions / totalSubscribers;
-            })
-            .reversed()
-            .thenComparingInt(topicIdPartition -> 
membersPerTopic.get(topicIdPartition.topicId()).size())
-            .thenComparingInt(TopicIdPartition::partitionId);
+    private List<Uuid> sortTopicIds(Collection<Uuid> topicIds) {
+        Comparator<Uuid> comparator = new Comparator<Uuid>() {
+            @Override
+            public int compare(final Uuid topic1Id, final Uuid topic2Id) {
+                int topic1PartitionCount = 
subscribedTopicDescriber.numPartitions(topic1Id);
+                int topic2PartitionCount = 
subscribedTopicDescriber.numPartitions(topic2Id);
+                int topic1SubscriberCount = 
topicSubscribers.get(topic1Id).size();
+                int topic2SubscriberCount = 
topicSubscribers.get(topic2Id).size();
+
+                // Order by partitions per subscriber, descending.
+                int order = Double.compare(
+                    (double) topic2PartitionCount / topic2SubscriberCount,
+                    (double) topic1PartitionCount / topic1SubscriberCount
+                );
+
+                // Then order by subscriber count, ascending.
+                if (order == 0) {
+                    order = Integer.compare(topic1SubscriberCount, 
topic2SubscriberCount);
+                }
 
-        return topicIdPartitions.stream()
-            .sorted(comparator)
-            .collect(Collectors.toList());
+                // Then order by topic id, ascending.
+                if (order == 0) {
+                    order = topic1Id.compareTo(topic2Id);
+                }
+
+                return order;
+            }
+        };
+
+        List<Uuid> sortedTopicIds = new ArrayList<>(topicIds);
+        sortedTopicIds.sort(comparator);
+        return sortedTopicIds;
     }
 
     /**
-     * Gets a set of partitions that are to be retained from the existing 
assignment. This includes:
-     * <li> Partitions from topics that are still present in both the new 
subscriptions and the topic metadata. </li>
+     * Imports the existing assignment into the target assignment, taking into 
account topic
+     * unsubscriptions.
      */
     private void assignStickyPartitions() {
-        groupSpec.memberIds().forEach(memberId ->
-            
groupSpec.memberAssignment(memberId).partitions().forEach((topicId, 
currentAssignment) -> {
+        for (String memberId : groupSpec.memberIds()) {
+            int memberIndex = memberIndices.get(memberId);
+
+            Map<Uuid, Set<Integer>> oldAssignment = 
groupSpec.memberAssignment(memberId).partitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            for (Map.Entry<Uuid, Set<Integer>> entry : 
oldAssignment.entrySet()) {
+                Uuid topicId = entry.getKey();
+                Set<Integer> partitions = entry.getValue();
+
                 if 
(groupSpec.memberSubscription(memberId).subscribedTopicIds().contains(topicId)) 
{
-                    currentAssignment.forEach(partition -> {
-                        TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
-                        
assignmentManager.addPartitionToTargetAssignment(topicIdPartition, memberId);
-                        assignedStickyPartitions.add(topicIdPartition);
-                    });
+                    memberTargetAssignmentSizes[memberIndex] += 
partitions.size();
+
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    for (int partition : partitions) {
+                        if (partition >= numPartitions) {
+                            LOG.warn(
+                                "Previous assignment for {} contains {}:{}, 
but topic only has {} partitions",
+                                memberId,
+                                topicId,
+                                partition,
+                                numPartitions
+                            );
+
+                            if (newAssignment == null) {
+                                // If the new assignment is null, we create a 
deep copy of the
+                                // original assignment so that we can alter it.
+                                newAssignment = deepCopy(oldAssignment);
+                            }
+                            Set<Integer> newPartitions = 
newAssignment.get(topicId);
+                            newPartitions.remove(partition);
+                            if (newPartitions.isEmpty()) {
+                                newAssignment.remove(topicId);
+                            }
+                            memberTargetAssignmentSizes[memberIndex]--;
+
+                            continue;

Review Comment:
   Now that the check for reduced partition counts is gone, this code no longer 
exists.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to