tedyu commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1666957266
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +92,225 @@ public String name() {
}
/**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
*/
- private static class MemberWithRemainingAssignments {
+ private static class TopicMetadata {
+ private final Uuid topicId;
+ private final int numPartitions;
+ private int numMembers;
+ private int minQuota = -1;
+ private int extraPartitions = -1;
+ private int nextRange = 0;
+
/**
- * Member Id.
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembers The number of subscribed members.
*/
- private final String memberId;
+ private TopicMetadata(Uuid topicId, int numPartitions, int numMembers)
{
+ this.topicId = topicId;
+ this.numPartitions = numPartitions;
+ this.numMembers = numMembers;
+ }
/**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra
partitions, if not already computed.
*/
- private final int remaining;
+ private void maybeComputeQuota() {
+ if (minQuota != -1) return;
- public MemberWithRemainingAssignments(String memberId, int remaining) {
- this.memberId = memberId;
- this.remaining = remaining;
+ // The minimum number of partitions each member should receive for
a balanced assignment.
+ minQuota = numPartitions / numMembers;
+
+ // Extra partitions to be distributed one to each member.
+ extraPartitions = numPartitions % numMembers;
+ }
+
+ @Override
+ public String toString() {
+ return "TopicMetadata(topicId=" + topicId +
+ ", numPartitions=" + numPartitions +
+ ", numMembers=" + numMembers +
+ ", minQuota=" + minQuota +
+ ", extraPartitions=" + extraPartitions +
+ ", nextRange=" + nextRange +
+ ')';
}
}
/**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for
group assignments.
- * @param subscribedTopicDescriber The metadata describer for
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal
number of partitions.
*/
- private Map<Uuid, Collection<String>> membersPerTopic(
- final GroupSpec groupSpec,
- final SubscribedTopicDescriber subscribedTopicDescriber
- ) {
- Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
- if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
- Collection<String> allMembers = groupSpec.memberIds();
- Collection<Uuid> topics =
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
- .subscribedTopicIds();
-
- for (Uuid topicId : topics) {
- if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
- throw new PartitionAssignorException("Member is subscribed
to a non-existent topic");
- }
- membersPerTopic.put(topicId, allMembers);
+ private GroupAssignment assignHomogeneousGroup(
+ GroupSpec groupSpec,
+ SubscribedTopicDescriber subscribedTopicDescriber
+ ) throws PartitionAssignorException {
+ List<String> memberIds = sortMemberIds(groupSpec);
Review Comment:
It seems this sorting can be delayed. On line 159, there may be exception
thrown.
the sorting can be done when the loop starting on line 157 finishes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]