dajac commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1662897722
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,233 @@ public String name() {
}
/**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
*/
- private static class MemberWithRemainingAssignments {
+ private static class TopicMetadata {
+ public final Uuid topicId;
+ public final int numPartitions;
+ public int numMembers;
+
+ public int minQuota = -1;
+ public int extraPartitions = -1;
+ public int nextRange = 0;
+
+ /**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembers The number of subscribed members.
+ */
+ private TopicMetadata(Uuid topicId, int numPartitions, int numMembers)
{
+ this.topicId = topicId;
+ this.numPartitions = numPartitions;
+ this.numMembers = numMembers;
+ }
+
/**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembers The number of subscribed members.
+ * @return A new TopicMetadata instance.
*/
- private final String memberId;
+ public static TopicMetadata create(Uuid topicId, int numPartitions,
int numMembers) {
+ return new TopicMetadata(topicId, numPartitions, numMembers);
+ }
/**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra
partitions, if not already computed.
*/
- private final int remaining;
+ void maybeComputeQuota() {
+ // The minimum number of partitions each member should receive for
a balanced assignment.
+ if (minQuota != -1) return;
+ minQuota = numPartitions / numMembers;
+
+ // Extra partitions to be distributed one to each member.
+ extraPartitions = numPartitions % numMembers;
+ }
- public MemberWithRemainingAssignments(String memberId, int remaining) {
- this.memberId = memberId;
- this.remaining = remaining;
+ @Override
+ public String toString() {
+ return "TopicMetadata{" +
+ "topicId=" + topicId +
+ ", numPartitions=" + numPartitions +
+ ", numMembers=" + numMembers +
+ ", minQuota=" + minQuota +
+ ", extraPartitions=" + extraPartitions +
+ ", nextRange=" + nextRange +
+ '}';
}
}
/**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for
group assignments.
- * @param subscribedTopicDescriber The metadata describer for
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal
number of partitions.
*/
- private Map<Uuid, Collection<String>> membersPerTopic(
- final GroupSpec groupSpec,
- final SubscribedTopicDescriber subscribedTopicDescriber
- ) {
- Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
- if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
- Collection<String> allMembers = groupSpec.memberIds();
- Collection<Uuid> topics =
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
- .subscribedTopicIds();
-
- for (Uuid topicId : topics) {
- if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
- throw new PartitionAssignorException("Member is subscribed
to a non-existent topic");
- }
- membersPerTopic.put(topicId, allMembers);
+ private GroupAssignment assignHomogeneousGroup(
+ GroupSpec groupSpec,
+ SubscribedTopicDescriber subscribedTopicDescriber
+ ) throws PartitionAssignorException {
+ List<String> memberIds = sortMemberIds(groupSpec);
+
+ MemberSubscription subs =
groupSpec.memberSubscription(memberIds.get(0));
+ Set<Uuid> subscribedTopics = new HashSet<>(subs.subscribedTopicIds());
+ List<TopicMetadata> topics = new ArrayList<>(subscribedTopics.size());
+ int numMembers = groupSpec.memberIds().size();
+
+ for (Uuid topicId : subscribedTopics) {
+ int numPartitions =
subscribedTopicDescriber.numPartitions(topicId);
+ if (numPartitions == -1) {
+ throw new PartitionAssignorException("Member is subscribed to
a non-existent topic");
}
- } else {
- groupSpec.memberIds().forEach(memberId -> {
- Collection<Uuid> topics =
groupSpec.memberSubscription(memberId).subscribedTopicIds();
- for (Uuid topicId : topics) {
- if (subscribedTopicDescriber.numPartitions(topicId) == -1)
{
- throw new PartitionAssignorException("Member is
subscribed to a non-existent topic");
- }
- membersPerTopic
- .computeIfAbsent(topicId, k -> new ArrayList<>())
- .add(memberId);
- }
- });
+ TopicMetadata m = TopicMetadata.create(
+ topicId,
+ numPartitions,
+ numMembers
+ );
+ topics.add(m);
}
- return membersPerTopic;
+ Map<String, MemberAssignment> assignments = new HashMap<>((int)
((groupSpec.memberIds().size() / 0.75f) + 1));
+
+ for (String memberId : memberIds) {
+ Map<Uuid, Set<Integer>> assignment = new HashMap<>((int)
((subscribedTopics.size() / 0.75f) + 1));
+ for (TopicMetadata topicMetadata : topics) {
+ topicMetadata.maybeComputeQuota();
+ addPartitionsToAssignment(topicMetadata, assignment);
+ }
+ assignments.put(memberId, new MemberAssignmentImpl(assignment));
+ }
+
+ return new GroupAssignment(assignments);
}
/**
- * The algorithm includes the following steps:
- * <ol>
- * <li> Generate a map of members per topic using the given member
subscriptions. </li>
- * <li> Generate a list of members called potentially unfilled
members, which consists of members that have not
- * met the minimum required quota of partitions for the
assignment AND get a list called assigned sticky
- * partitions for topic, which has the partitions that will be
retained in the new assignment. </li>
- * <li> Generate a list of unassigned partitions by calculating the
difference between the total partitions
- * for the topic and the assigned (sticky) partitions. </li>
- * <li> Find members from the potentially unfilled members list that
haven't met the total required quota
- * i.e. minRequiredQuota + 1, if the member is designated to
receive one of the excess partitions OR
- * minRequiredQuota otherwise. </li>
- * <li> Assign partitions to them in ranges from the unassigned
partitions per topic
- * based on the remaining partitions value. </li>
- * </ol>
+ * Assigns partitions to members of a heterogeneous group. Not all members
are subscribed to the same topics.
*/
- @Override
- public GroupAssignment assign(
- final GroupSpec groupSpec,
- final SubscribedTopicDescriber subscribedTopicDescriber
+ private GroupAssignment assignHeterogeneousGroup(
+ GroupSpec groupSpec,
+ SubscribedTopicDescriber subscribedTopicDescriber
) throws PartitionAssignorException {
- Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();
-
- // Step 1
- Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
- groupSpec,
- subscribedTopicDescriber
- );
-
- membersPerTopic.forEach((topicId, membersForTopic) -> {
- int numPartitionsForTopic =
subscribedTopicDescriber.numPartitions(topicId);
- int minRequiredQuota = numPartitionsForTopic /
membersForTopic.size();
- // Each member can get only ONE extra partition per topic after
receiving the minimum quota.
- int numMembersWithExtraPartition = numPartitionsForTopic %
membersForTopic.size();
-
- // Step 2
- Set<Integer> assignedStickyPartitionsForTopic = new HashSet<>();
- List<MemberWithRemainingAssignments> potentiallyUnfilledMembers =
new ArrayList<>();
-
- for (String memberId : membersForTopic) {
- Set<Integer> assignedPartitionsForTopic = groupSpec
- .memberAssignment(memberId)
- .partitions()
- .getOrDefault(topicId, Collections.emptySet());
-
- int currentAssignmentSize = assignedPartitionsForTopic.size();
- List<Integer> currentAssignmentListForTopic = new
ArrayList<>(assignedPartitionsForTopic);
-
- // If there were partitions from this topic that were
previously assigned to this member, retain as many as possible.
- // Sort the current assignment in ascending order since we
want the same partition numbers from each topic
- // to go to the same member, in order to facilitate joins in
case of co-partitioned topics.
- if (currentAssignmentSize > 0) {
- int retainedPartitionsCount = min(currentAssignmentSize,
minRequiredQuota);
- Collections.sort(currentAssignmentListForTopic);
- for (int i = 0; i < retainedPartitionsCount; i++) {
- assignedStickyPartitionsForTopic
- .add(currentAssignmentListForTopic.get(i));
- newTargetAssignment.computeIfAbsent(memberId, k -> new
MemberAssignmentImpl(new HashMap<>()))
- .partitions()
- .computeIfAbsent(topicId, k -> new HashSet<>())
- .add(currentAssignmentListForTopic.get(i));
+ List<String> memberIds = sortMemberIds(groupSpec);
+
+ Map<Uuid, TopicMetadata> topics = new HashMap<>();
+
+ for (String memberId : memberIds) {
+ MemberSubscription subs = groupSpec.memberSubscription(memberId);
+ for (Uuid topicId : subs.subscribedTopicIds()) {
+ TopicMetadata topicMetadata = topics.computeIfAbsent(topicId,
__ -> {
+ int numPartitions =
subscribedTopicDescriber.numPartitions(topicId);
+ if (numPartitions == -1) {
+ throw new PartitionAssignorException("Member is
subscribed to a non-existent topic");
}
- }
-
- // Number of partitions required to meet the minRequiredQuota.
- // There are 3 cases w.r.t the value of remaining:
- // 1) remaining < 0: this means that the member has more than
the min required amount.
- // 2) If remaining = 0: member has the minimum required
partitions, but it may get an extra partition, so it is a potentially unfilled
member.
- // 3) If remaining > 0: member doesn't have the minimum
required partitions, so it should be added to potentiallyUnfilledMembers.
- int remaining = minRequiredQuota - currentAssignmentSize;
-
- // Retain extra partitions as well when applicable.
- if (remaining < 0 && numMembersWithExtraPartition > 0) {
- numMembersWithExtraPartition--;
- // Since we already added the minimumRequiredQuota of
partitions in the previous step (until minReq - 1), we just need to
- // add the extra partition that will be present at the
index right after min quota was satisfied.
- assignedStickyPartitionsForTopic
-
.add(currentAssignmentListForTopic.get(minRequiredQuota));
- newTargetAssignment.computeIfAbsent(memberId, k -> new
MemberAssignmentImpl(new HashMap<>()))
- .partitions()
- .computeIfAbsent(topicId, k -> new HashSet<>())
-
.add(currentAssignmentListForTopic.get(minRequiredQuota));
- } else {
- MemberWithRemainingAssignments newPair = new
MemberWithRemainingAssignments(memberId, remaining);
- potentiallyUnfilledMembers.add(newPair);
- }
+
+ return TopicMetadata.create(
+ topicId,
+ numPartitions,
+ 0
+ );
+ });
+ topicMetadata.numMembers++;
}
+ }
- // Step 3
- // Find the difference between the total partitions per topic and
the already assigned sticky partitions for the topic to get the unassigned
partitions.
- // List of unassigned partitions for topic contains the partitions
in ascending order.
- List<Integer> unassignedPartitionsForTopic = new ArrayList<>();
- for (int i = 0; i < numPartitionsForTopic; i++) {
- if (!assignedStickyPartitionsForTopic.contains(i)) {
- unassignedPartitionsForTopic.add(i);
- }
+ Map<String, MemberAssignment> assignments = new HashMap<>((int)
((groupSpec.memberIds().size() / 0.75f) + 1));
+
+ for (String memberId : memberIds) {
+ MemberSubscription subs = groupSpec.memberSubscription(memberId);
+ Map<Uuid, Set<Integer>> assignment = new
HashMap<>(subs.subscribedTopicIds().size());
+ for (Uuid topicId : subs.subscribedTopicIds()) {
+ TopicMetadata metadata = topics.get(topicId);
+ metadata.maybeComputeQuota();
+ addPartitionsToAssignment(metadata, assignment);
}
+ assignments.put(memberId, new MemberAssignmentImpl(assignment));
+ }
+
+ return new GroupAssignment(assignments);
+ }
- // Step 4 and Step 5
- // Account for the extra partitions if necessary and increase the
required quota by 1.
- // If remaining > 0 after increasing the required quota, assign
the remaining number of partitions from the unassigned partitions list.
- int unassignedPartitionsListStartPointer = 0;
- for (MemberWithRemainingAssignments pair :
potentiallyUnfilledMembers) {
- String memberId = pair.memberId;
- int remaining = pair.remaining;
- if (numMembersWithExtraPartition > 0) {
- remaining++;
- numMembersWithExtraPartition--;
- }
- if (remaining > 0) {
- List<Integer> partitionsToAssign =
unassignedPartitionsForTopic
- .subList(unassignedPartitionsListStartPointer,
unassignedPartitionsListStartPointer + remaining);
- unassignedPartitionsListStartPointer += remaining;
- newTargetAssignment.computeIfAbsent(memberId, k -> new
MemberAssignmentImpl(new HashMap<>()))
- .partitions()
- .computeIfAbsent(topicId, k -> new HashSet<>())
- .addAll(partitionsToAssign);
- }
+ /**
+ * Sorts the member Ids in the group based on their instance Id if
present, otherwise by member Id.
+ * This is done to ensure that the relative ordering of members doesn't
change with static members
+ * thus resulting in a sticky assignment.
+ *
+ * @param groupSpec The group specification containing the member
information.
+ * @return a sorted list of member Ids.
+ */
+ private List<String> sortMemberIds(
+ GroupSpec groupSpec
+ ) {
+ List<String> sortedMemberIds = new ArrayList<>(groupSpec.memberIds());
+ Map<String, Optional<String>> instanceIdCache = new HashMap<>();
+
+ for (String memberId : sortedMemberIds) {
+ instanceIdCache.put(memberId,
groupSpec.memberSubscription(memberId).instanceId());
+ }
Review Comment:
Ok. Let’s add a small comment about the reason then because it is not
obvious.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]