rreddy-22 commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1619917677
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws
PartitionAssignorException {
}
}
- // The minimum required quota that each member needs to meet for a
balanced assignment.
- // This is the same for all members.
- final int numberOfMembers = groupSpec.members().size();
- final int minQuota = totalPartitionsCount / numberOfMembers;
+ // Compute the minimum required quota per member and the number of
members
+ // who should receive an extra partition.
+ int numberOfMembers = groupSpec.members().size();
+ minimumMemberQuota = totalPartitionsCount / numberOfMembers;
remainingMembersToGetAnExtraPartition = totalPartitionsCount %
numberOfMembers;
- groupSpec.members().keySet().forEach(memberId ->
- targetAssignment.put(memberId, new MemberAssignment(new
HashMap<>())
- ));
-
- potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
- unassignedPartitionsRoundRobinAssignment();
+ // Revoke the partitions which are either not part of the
subscriptions or above
+ // the maximum quota.
+ maybeRevokePartitions();
- if (!unassignedPartitions.isEmpty()) {
- throw new PartitionAssignorException("Partitions were left
unassigned");
- }
+ // Assign the unassigned partitions to the members with space.
+ assignRemainingPartitions();
return new GroupAssignment(targetAssignment);
}
- /**
- * Retains a set of partitions from the existing assignment and includes
them in the target assignment.
- * Only relevant partitions that exist in the current topic metadata and
subscriptions are considered.
- *
- * <p> For each member:
- * <ol>
- * <li> Find the valid current assignment considering topic
subscriptions and metadata</li>
- * <li> If the current assignment exists, retain partitions up to the
minimum quota.</li>
- * <li> If the current assignment size is greater than the minimum
quota and
- * there are members that could get an extra partition, assign
the next partition as well.</li>
- * <li> Finally, if the member's current assignment size is less than
the minimum quota,
- * add them to the potentially unfilled members map and track the
number of remaining
- * partitions required to meet the quota.</li>
- * </ol>
- * </p>
- *
- * @return Members mapped to the remaining number of partitions needed to
meet the minimum quota,
- * including members that are eligible to receive an extra
partition.
- */
- private Map<String, Integer> assignStickyPartitions(int minQuota) {
- Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
-
- groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
- List<TopicIdPartition> validCurrentMemberAssignment =
validCurrentMemberAssignment(
- assignmentMemberSpec.assignedPartitions()
- );
-
- int currentAssignmentSize = validCurrentMemberAssignment.size();
- // Number of partitions required to meet the minimum quota.
- int remaining = minQuota - currentAssignmentSize;
-
- if (currentAssignmentSize > 0) {
- int retainedPartitionsCount = min(currentAssignmentSize,
minQuota);
- IntStream.range(0, retainedPartitionsCount).forEach(i -> {
- TopicIdPartition topicIdPartition =
validCurrentMemberAssignment.get(i);
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- });
-
- if (remaining < 0) {
- // The extra partition is located at the last index from
the previous step.
- if (remainingMembersToGetAnExtraPartition > 0) {
- TopicIdPartition topicIdPartition =
validCurrentMemberAssignment.get(retainedPartitionsCount++);
- addPartitionToAssignment(
- targetAssignment,
- memberId,
- topicIdPartition.topicId(),
- topicIdPartition.partitionId()
- );
- remainingMembersToGetAnExtraPartition--;
+ private void maybeRevokePartitions() {
+ for (Map.Entry<String, AssignmentMemberSpec> entry :
groupSpec.members().entrySet()) {
+ String memberId = entry.getKey();
+ AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+ Map<Uuid, Set<Integer>> oldAssignment =
assignmentMemberSpec.assignedPartitions();
+ Map<Uuid, Set<Integer>> newAssignment = null;
+
+ // The assignor expects to receive the assignment as an immutable
map. It leverages
+ // this knowledge in order to avoid having to copy all assignments.
+ if (!isImmutableMap(oldAssignment)) {
+ throw new IllegalStateException("The assignor expect an
immutable map.");
+ }
+
+ int quota = minimumMemberQuota;
+ if (remainingMembersToGetAnExtraPartition > 0) {
+ quota++;
+ remainingMembersToGetAnExtraPartition--;
+ }
+
+ for (Map.Entry<Uuid, Set<Integer>> topicPartitions :
oldAssignment.entrySet()) {
+ Uuid topicId = topicPartitions.getKey();
+ Set<Integer> partitions = topicPartitions.getValue();
+
+ if (subscribedTopicIds.contains(topicId)) {
+ if (partitions.size() <= quota) {
+ quota -= partitions.size();
+ } else {
+ for (Integer partition : partitions) {
Review Comment:
yep It might make the results unpredictable, in which case we probably
can't/shouldn't do 1:1 mapping in the tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]