ableegoldman commented on a change in pull request #10509: URL: https://github.com/apache/kafka/pull/10509#discussion_r621502594
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { Review comment: nit: don't use `++` inline like this, it makes the code harder to understand plus we'll end up incrementing it past the `numExpectedMaxCapacityMembers` so its value won't represent the actual number of members at max capacity in the end. This might be confusing, eg if we want to add logging that includes this value later on. Let's increment it in the body of this `else if` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { + // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members Review comment: ```suggestion // consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { + // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members + // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { - // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota - if (consumerAssignment.size() == minQuota) - minCapacityMembers.add(consumer); - if (consumerAssignment.size() == maxQuota) - maxCapacityMembers.add(consumer); + // consumer owned the "minQuota" of partitions or more Review comment: ```suggestion // consumer owned at least "minQuota" of partitions but we're already at the allowed number of max capacity members ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { + // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members + // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { - // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota - if (consumerAssignment.size() == minQuota) - minCapacityMembers.add(consumer); - if (consumerAssignment.size() == maxQuota) - maxCapacityMembers.add(consumer); + // consumer owned the "minQuota" of partitions or more + // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, minQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, minQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); } } + List<TopicPartition> unassignedPartitions; + if (!toBeRemovedPartitions.isEmpty()) { + Collections.sort(toBeRemovedPartitions, + Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); + unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions); + sortedAllPartitions = null; + } else { + unassignedPartitions = sortedAllPartitions; + } + toBeRemovedPartitions = null; + + if (log.isDebugEnabled()) { + log.debug(String.format( + "After reassigning previously owned partitions, unfilled members: %s, unassigned partitions: %s, " + + "current assignment: %s", unfilledMembers, unassignedPartitions, assignment)); + } + Collections.sort(unfilledMembers); Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator(); - // Fill remaining members up to minQuota + // fill remaining members up to the expected numbers of maxQuota, otherwise, to minQuota while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) { Iterator<String> unfilledConsumerIter = unfilledMembers.iterator(); while (unfilledConsumerIter.hasNext()) { String consumer = unfilledConsumerIter.next(); List<TopicPartition> consumerAssignment = assignment.get(consumer); + int expectedAssignedCount = numMaxCapacityMembers < numExpectedMaxCapacityMembers ? maxQuota : minQuota; + int currentAssignedCount = consumerAssignment.size(); if (unassignedPartitionsIter.hasNext()) { TopicPartition tp = unassignedPartitionsIter.next(); consumerAssignment.add(tp); - unassignedPartitionsIter.remove(); + currentAssignedCount++; // We already assigned all possible ownedPartitions, so we know this must be newly to this consumer if (allRevokedPartitions.contains(tp)) partitionsTransferringOwnership.put(tp, consumer); } else { + // This will only happen when current consumer has minQuota of partitions, and in previous round, + // the expectedAssignedCount is maxQuota, so, still in unfilledMembers list. + // But now, expectedAssignedCount is minQuota, we can remove it. + if (currentAssignedCount != minQuota) { + // Should not enter here since we have calculated the exact number to assign to each consumer + log.warn(String.format( + "No more partitions to be assigned. consumer: [%s] with current size: %d, but expected size is %d", + consumer, currentAssignedCount, expectedAssignedCount)); + } + unfilledConsumerIter.remove(); break; } - if (consumerAssignment.size() == minQuota) { - minCapacityMembers.add(consumer); + if (currentAssignedCount == expectedAssignedCount) { + if (currentAssignedCount == maxQuota) { + numMaxCapacityMembers++; + } unfilledConsumerIter.remove(); } } } - // If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions - // from the over-full consumers at max capacity - for (String consumer : unfilledMembers) { - List<TopicPartition> consumerAssignment = assignment.get(consumer); - int remainingCapacity = minQuota - consumerAssignment.size(); - while (remainingCapacity > 0) { - String overloadedConsumer = maxCapacityMembers.poll(); - if (overloadedConsumer == null) { - throw new IllegalStateException("Some consumers are under capacity but all partitions have been assigned"); + if (log.isDebugEnabled()) { + log.debug("final assignment: " + assignment); Review comment: ```suggestion log.debug("Final assignment of partitions to consumers: \n{}", assignment); ``` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); Review comment: Can we pick a better name for this? I had trouble understanding what this list was for, but after reading all the code it sounds like it's actually just the partitions which have been assigned. And thus are "to be removed" from the list of partitions to be assigned -- is that where the name comes from? I do see the logic behind that, but it wasn't clear until I'd carefully read everything. Maybe just `assignedPartitions` would be clearer? 🤷♀️ ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, if (log.isDebugEnabled()) { log.debug("final assignment: " + assignment); } - + return assignment; } - private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> partitionsPerTopic) { - SortedSet<TopicPartition> allPartitions = - new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); - for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { - String topic = entry.getKey(); - for (int i = 0; i < entry.getValue(); ++i) { + /** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedToBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ + private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, + List<TopicPartition> sortedToBeRemovedPartitions) { + List<TopicPartition> unassignedPartitions = new ArrayList<>( + sortedPartitions.size() - sortedToBeRemovedPartitions.size()); + + int index = 0; + boolean shouldAddDirectly = false; + int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size(); + TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index); + for (TopicPartition topicPartition : sortedPartitions) { + if (shouldAddDirectly || !nextPartition.equals(topicPartition)) { + unassignedPartitions.add(topicPartition); + } else { + // equal case, don't add to unassignedPartitions, just get next partition + if (index < sizeToBeRemovedPartitions - 1) { + nextPartition = sortedToBeRemovedPartitions.get(++index); + } else { + // add the remaining directly since there is no more toBeRemovedPartitions + shouldAddDirectly = true; + } + } + } + return unassignedPartitions; + } + + + private List<TopicPartition> getTopicPartitions(Map<String, Integer> partitionsPerTopic) { + List<TopicPartition> allPartitions = new ArrayList<>( + partitionsPerTopic.values().stream().reduce(0, Integer::sum)); + + List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet()); + // sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0 + Collections.sort(allTopics); Review comment: I did the math and it seems to come down to roughly O(N*logN) vs O(2*logN), which for N = 1 million is a roughly 10x improvement. Not bad, of course it is a tradeoff and there are other factors as mentioned above. But still very nice 👍 ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, if (log.isDebugEnabled()) { log.debug("final assignment: " + assignment); } - + return assignment; } - private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> partitionsPerTopic) { - SortedSet<TopicPartition> allPartitions = - new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); - for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { - String topic = entry.getKey(); - for (int i = 0; i < entry.getValue(); ++i) { + /** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedToBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ + private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, Review comment: Just to be clear, a `removeAll(M partitions)` operation on TreeSet should still be only O(M*logN) since each individual remove is only logN. Even for N = 1 million, logN is under 20. So it scales more with how many partitions are being removed. I tried to do the math here and found the time complexity of the original to be (slightly) better on paper, but maybe I missed something or the reality is just different for certain input parameters (bigO time is not an absolute law after all 🙂 ) Maybe you can check my work and then run some tests with and without this specific change (but with all other improvements included). Let's say the number of consumers is C, the number of partitions assigned to each consumer is M, and the total number of partitions is N Before: Loop through all consumers and call (TreeSet) unassignedPartitions.removeAll(assignedPartitions). This is `C * M * logN` (where N will actually decrease down to ~0 by the end of the loop since as you pointed out, most partitions should be reassigned in the _sticky_ assignor) After: Loop through all consumers and call (ArrayList) toBeRemovedPartitions.addAll(assignedPartitions). Since addAll has to copy all M elements, this is C * M. After that we call sort(toBeRemovedPartitions), where toBeRemovedPartitions = C * M, so this is C * M * log(C * M). And then finally we call getUnassignedPartitions, which is O(N). So putting it altogether we have C * M + C * M * log(C*M) + N It's a little hard to compare these directly but in general we'll have C * M ≈ N (since most partitions are reassigned) in which case the before vs after runtimes can be reduced to O(N<sup>2</sup>log(N)) vs O(2N + N<sup>2</sup>log(N)). The before case is actually better, although they're roughly the same for large N. If this analysis holds up in your experiments then we should just go with whichever one uses less memory and/or has the simplest code. In my (admittedly biased) opinion the original method was the easiest to understand in the code, and objectively speaking it also used less memory since we only need the one `unassignedPartitions` data structure. ie we can just rename `sortedAllPartitions` back to `unassignedPartitions` (or maybe partitionsToBeAssigned is better?) and then we can delete both the `partitionsToBeRemoved` and the second `unassignedPartitions`. Thoughts? ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -263,16 +279,59 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, if (log.isDebugEnabled()) { log.debug("final assignment: " + assignment); } - + return assignment; } - private SortedSet<TopicPartition> getTopicPartitions(Map<String, Integer> partitionsPerTopic) { - SortedSet<TopicPartition> allPartitions = - new TreeSet<>(Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); - for (Entry<String, Integer> entry: partitionsPerTopic.entrySet()) { - String topic = entry.getKey(); - for (int i = 0; i < entry.getValue(); ++i) { + /** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedToBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ + private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, Review comment: Of course we also need to take into account that going back to the TreeSet will mean we lose the improvement in `getTopicPartitions` as well. I did the math there and found it came down to O(NlogN) vs O(2N). And since logN grows so quickly this is like a 10x improvement for 1 million partitions. Adding this all up, the new algorithm does come out _slightly_ on top now: Before we had O(NlogN + N<sup>2</sup>log(N)) and now we have O(4N + NlogN + N<sup>2</sup>log(N)). Taking logN ≈ 20 means a 5x improvement for the part that scales as N. -- which is great, obviously, but the time for `getTopicPartitions` is negligible compared to the time complexity of this loop we discussed above, ie the N<sup>2</sup>term is going to dominate the N term. So we have a tradeoff here between an unknown but possibly small performance improvement that uses a lot more memory, and a somewhat worse algorithm with only the one data structure and slightly cleaner code. That's a hard tradeoff to comment on without more data. If you could re-run your experiments with all other improvements implemented but without these two things (ie the use of `toBeRemovedPartitions instead of dynamically removing from `unassignedPartitions`, and going back to TreeSet in `getTopicPartitions`) then we can see what the actual performance characteristics are and also get a sense of how much extra memory we're talking. I'd recommend first addressing all my other comments here and re-running with/without this stuff in case those other comments have an impact, so we can be fair to both sides. Anyways that's my analysis here, but you've been looking at this more carefully, more recently so it's possible I'm missing something here. Interested to hear your take on this ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -108,6 +107,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, subscribedTopics.addAll(subscription.topics()); } else if (!(subscription.topics().size() == subscribedTopics.size() && subscribedTopics.containsAll(subscription.topics()))) { + // we don't need consumerToOwnedPartitions in general assign case + consumerToOwnedPartitions = null; Review comment: nit: can we set it to null up in `#assign`, before invoking `generalAssign`? I feel that's slightly more future-proof, as it's easy to miss that this gets cleared when it occurs deep in this boolean check method, in case someone might decide they want to use this map in `generalAssign`. Which could happen since it does build up basically this exact same information in a later loop -- an alternative to nullifying this map we could just pass it in to `generalAssign` to replace the `currentAssignment` map that gets filled in via `prepopulateCurrentAssignments`. That won't save us from looping through all the assignments entirely since we also need to populate `prevAssignments` but will still save some time by cutting out the filling in of `currentAssignment` ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { + // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members + // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { - // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota - if (consumerAssignment.size() == minQuota) - minCapacityMembers.add(consumer); - if (consumerAssignment.size() == maxQuota) - maxCapacityMembers.add(consumer); + // consumer owned the "minQuota" of partitions or more + // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, minQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, minQuota)); Review comment: Same as above, let's assign `ownedPartitions.subList(0, minQuota)` to a variable ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { Review comment: Also, should it be `numMaxCapacityMembers < numExpectedMaxCapacityMember`? If `numMaxCapacityMembers` is already equal to the expected number then that means we can't give this member a `maxQuota` number of partitions. I'm guessing this is a typo since you yourself say "we're still _under_ the number of expected max capacity members.." in the comment below. Or maybe you made it `<=` because of the `++`, but that's a post-increment which means the value is only incremented _after_ the variable is used in the comparison, so you'd only need `<=` to be correct if it was `++numMaxCapacityMembers` instead. This post/-pre-increment stuff is confusing, that's exactly why I suggested to not do this inline but just increment it in the body 🙂 ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -234,23 +245,28 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, List<TopicPartition> consumerAssignment = assignment.get(consumer); int expectedAssignedCount = numMaxCapacityMembers < numExpectedMaxCapacityMembers ? maxQuota : minQuota; + int currentAssignedCount = consumerAssignment.size(); if (unassignedPartitionsIter.hasNext()) { TopicPartition tp = unassignedPartitionsIter.next(); consumerAssignment.add(tp); - unassignedPartitionsIter.remove(); Review comment: Well, `unassignedPartitions` used to be a TreeSet so removal should still be relatively fast (`O(logN)`). But I see you turned this into an ArrayList which certainly would be slow. That's a good observation though, since we have computed the exact capacity then we can know the remaining number of unfilled members exactly and thus we don't need to remove from this list. I think we can then simplify things even further, see my comment above ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { + // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members + // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { - // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota - if (consumerAssignment.size() == minQuota) - minCapacityMembers.add(consumer); - if (consumerAssignment.size() == maxQuota) - maxCapacityMembers.add(consumer); + // consumer owned the "minQuota" of partitions or more + // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, minQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, minQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); } } + List<TopicPartition> unassignedPartitions; + if (!toBeRemovedPartitions.isEmpty()) { + Collections.sort(toBeRemovedPartitions, + Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); + unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions); + sortedAllPartitions = null; + } else { + unassignedPartitions = sortedAllPartitions; + } + toBeRemovedPartitions = null; + + if (log.isDebugEnabled()) { + log.debug(String.format( + "After reassigning previously owned partitions, unfilled members: %s, unassigned partitions: %s, " + + "current assignment: %s", unfilledMembers, unassignedPartitions, assignment)); + } + Collections.sort(unfilledMembers); Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator(); - // Fill remaining members up to minQuota + // fill remaining members up to the expected numbers of maxQuota, otherwise, to minQuota while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) { Review comment: Now that we're not removing from `unassignedPartitions` there's no reason to include it here. But I'd actually go a step further and change this to `for (TopicPartition unassignedPartition : unassignedPartitions)` -- that way we don't have to worry about breaking as we should only need to loop over to assign each of these partitions once, and we have one less Iterator to deal with Then we can check and log an error (or even throw an exception) if there are still consumers in `unfilledMembers` at the end, or of course if we run out of `unfilledMembers` before exiting the for loop ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { + // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members + // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); Review comment: I'm not sure how much of a difference this will make, but since we're using the `ownedPartitions.subList(0, maxQuota)` twice maybe we should assign it to a variable rather than compute it twice ########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -163,127 +159,179 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, */ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic, Map<String, List<TopicPartition>> consumerToOwnedPartitions) { - SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic); + if (log.isDebugEnabled()) { + log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s", + partitionsPerTopic, consumerToOwnedPartitions)); + } + + List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic); Set<TopicPartition> allRevokedPartitions = new HashSet<>(); - // Each consumer should end up in exactly one of the below - // the consumers not yet at capacity + // the consumers not yet at expected capacity List<String> unfilledMembers = new LinkedList<>(); - // the members with exactly maxQuota partitions assigned - Queue<String> maxCapacityMembers = new LinkedList<>(); - // the members with exactly minQuota partitions assigned - Queue<String> minCapacityMembers = new LinkedList<>(); int numberOfConsumers = consumerToOwnedPartitions.size(); - int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers); - int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers); + int totalPartitionsCount = sortedAllPartitions.size(); + + int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); + int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); + // the expected number of members with maxQuota assignment + int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers; + // the number of members with exactly maxQuota partitions assigned + int numMaxCapacityMembers = 0; - // initialize the assignment map with an empty array of size minQuota for all members + // initialize the assignment map with an empty array of size maxQuota for all members Map<String, List<TopicPartition>> assignment = new HashMap<>( - consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); + consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); + List<TopicPartition> toBeRemovedPartitions = new ArrayList<>(); // Reassign as many previously owned partitions as possible for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) { String consumer = consumerEntry.getKey(); List<TopicPartition> ownedPartitions = consumerEntry.getValue(); List<TopicPartition> consumerAssignment = assignment.get(consumer); - int i = 0; - // assign the first N partitions up to the max quota, and mark the remaining as being revoked - for (TopicPartition tp : ownedPartitions) { - if (i < maxQuota) { - consumerAssignment.add(tp); - unassignedPartitions.remove(tp); - } else { - allRevokedPartitions.add(tp); - } - ++i; - } if (ownedPartitions.size() < minQuota) { + // the expected assignment size is more than consumer have now, so keep all the owned partitions + // and put this member into unfilled member list + if (ownedPartitions.size() > 0) { + consumerAssignment.addAll(ownedPartitions); + toBeRemovedPartitions.addAll(ownedPartitions); + } unfilledMembers.add(consumer); + } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) { + // consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members + // so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size())); } else { - // It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota - if (consumerAssignment.size() == minQuota) - minCapacityMembers.add(consumer); - if (consumerAssignment.size() == maxQuota) - maxCapacityMembers.add(consumer); + // consumer owned the "minQuota" of partitions or more + // so keep "minQuota" of the owned partitions, and revoke the rest of the partitions + consumerAssignment.addAll(ownedPartitions.subList(0, minQuota)); + toBeRemovedPartitions.addAll(ownedPartitions.subList(0, minQuota)); + allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); } } + List<TopicPartition> unassignedPartitions; + if (!toBeRemovedPartitions.isEmpty()) { + Collections.sort(toBeRemovedPartitions, + Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition)); + unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions); + sortedAllPartitions = null; Review comment: `sortedAllPartitions` can be cleared for both cases I think, like `toBeRemovedPartitions`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org