ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r627020382
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -149,141 +149,210 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, * This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics. * The method includes the following steps: * - * 1. Reassign as many previously owned partitions as possible, up to the maxQuota - * 2. Fill remaining members up to minQuota - * 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - * from the over-full consumers at max capacity - * 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - * should just distribute one partition each to all consumers at min capacity + * 1. Reassign previously owned partitions: + * a. if owned less than minQuota partitions, just assign all owned partitions, and put the member into unfilled member list + * b. if owned maxQuota or more, and we're still under the number of expected max capacity members, assign maxQuota partitions + * c. if owned at least "minQuota" of partitions, assign minQuota partitions, and put the member into unfilled member list if + * we're still under the number of expected max capacity members + * 2. Fill remaining members up to the expected numbers of maxQuota partitions, otherwise, to minQuota partitions * * @param partitionsPerTopic The number of partitions for each subscribed topic * @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions * - * @return Map from each member to the list of partitions assigned to them. + * @return Map from each member to the list of partitions assigned to them. */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}", + partitionsPerTopic, consumerToOwnedPartitions); + } Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMembersHavingMorePartitions = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> assignedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + assignedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) { + // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members + // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + numMembersHavingMorePartitions++; + List<TopicPartition> maxQuotaPartitions = ownedPartitions.subList(0, maxQuota); + consumerAssignment.addAll(maxQuotaPartitions); + assignedPartitions.addAll(maxQuotaPartitions); + allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { - // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota - if (consumerAssignment.size() == minQuota) - minCapacityMembers.add(consumer); - if (consumerAssignment.size() == maxQuota) - maxCapacityMembers.add(consumer); + // consumer owned at least "minQuota" of partitions + // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions + List<TopicPartition> minQuotaPartitions = ownedPartitions.subList(0, minQuota); + consumerAssignment.addAll(minQuotaPartitions); + assignedPartitions.addAll(minQuotaPartitions); + allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); + // this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members + if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) { + unfilledMembers.add(consumer); + } } } + List<TopicPartition> unassignedPartitions = getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions); + assignedPartitions = null; + + if (log.isDebugEnabled()) { + log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, " + + "current assignment: {}", unfilledMembers, unassignedPartitions, assignment); + } + Collections.sort(unfilledMembers); - Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator(); - - // Fill remaining members up to minQuota - while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) { - Iterator<String> unfilledConsumerIter = unfilledMembers.iterator(); - - while (unfilledConsumerIter.hasNext()) { - String consumer = unfilledConsumerIter.next(); - List<TopicPartition> consumerAssignment = assignment.get(consumer); - - if (unassignedPartitionsIter.hasNext()) { - TopicPartition tp = unassignedPartitionsIter.next(); - consumerAssignment.add(tp); - unassignedPartitionsIter.remove(); - // We already assigned all possible ownedPartitions, so we know this must be newly to this consumer - if (allRevokedPartitions.contains(tp)) - partitionsTransferringOwnership.put(tp, consumer); - } else { - break; - } - if (consumerAssignment.size() == minQuota) { - minCapacityMembers.add(consumer); - unfilledConsumerIter.remove(); + Iterator<String> unfilledConsumerIter = unfilledMembers.iterator(); + // Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota + for (TopicPartition unassignedPartition : unassignedPartitions) { + if (!unfilledConsumerIter.hasNext()) { + if (unfilledMembers.isEmpty()) { + // Should not enter here since we have calculated the exact number to assign to each consumer + // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. + throw new IllegalStateException("No more unfilled consumers to be assigned."); } + unfilledConsumerIter = unfilledMembers.iterator(); } - } - - // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - // from the over-full consumers at max capacity - for (String consumer : unfilledMembers) { + String consumer = unfilledConsumerIter.next(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int remainingCapacity = minQuota - consumerAssignment.size(); - while (remainingCapacity > 0) { - String overloadedConsumer = maxCapacityMembers.poll(); - if (overloadedConsumer == null) { - throw new IllegalStateException("Some consumers are under capacity but all partitions have been assigned"); + consumerAssignment.add(unassignedPartition); + + // We already assigned all possible ownedPartitions, so we know this must be newly assigned to this consumer + if (allRevokedPartitions.contains(unassignedPartition)) + partitionsTransferringOwnership.put(unassignedPartition, consumer); + + int currentAssignedCount = consumerAssignment.size(); + int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota; + if (currentAssignedCount == expectedAssignedCount) { + if (currentAssignedCount == maxQuota) { + numMembersHavingMorePartitions++; } - TopicPartition swappedPartition = assignment.get(overloadedConsumer).remove(0); - consumerAssignment.add(swappedPartition); - --remainingCapacity; - // This partition is by definition transferring ownership, the swapped partition must have come from - // the max capacity member's owned partitions since it can only reach max capacity with owned partitions - partitionsTransferringOwnership.put(swappedPartition, consumer); + unfilledConsumerIter.remove(); } - minCapacityMembers.add(consumer); } - // Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we - // should just distribute one partition each to all consumers at min capacity - for (TopicPartition unassignedPartition : unassignedPartitions) { - String underCapacityConsumer = minCapacityMembers.poll(); - if (underCapacityConsumer == null) { - throw new IllegalStateException("Some partitions are unassigned but all consumers are at maximum capacity"); + if (!unfilledMembers.isEmpty()) { + // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number + // of max capacity members. Otherwise, there must be error here. + if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) { + throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " + Review comment: nit: same here, can you log an error with the remaining `unfilledMembers`? I know you already do that in the exception message, but imo it would be better to print in a log message instead of an exception, as it may be long -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org