showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r629100529
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ########## @@ -387,58 +403,125 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, TreeSet<String> sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); + int totalPartitionCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, - consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); + consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, + partitionsPerTopic, totalPartitionCount); + + if (log.isDebugEnabled()) { + log.debug("final assignment: " + currentAssignment); + } + return currentAssignment; } + /** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedToBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ + private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, + List<TopicPartition> sortedToBeRemovedPartitions) { + List<TopicPartition> unassignedPartitions = new ArrayList<>(); + + int index = 0; + boolean shouldAddDirectly = false; + int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size(); + TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index); + for (TopicPartition topicPartition : sortedPartitions) { + if (shouldAddDirectly || !nextPartition.equals(topicPartition)) { + unassignedPartitions.add(topicPartition); + } else { + // equal case, don't add to unassignedPartitions, just get next partition + if (index < sizeToBeRemovedPartitions - 1) { + nextPartition = sortedToBeRemovedPartitions.get(++index); + } else { + // add the remaining directly since there is no more toBeRemovedPartitions + shouldAddDirectly = true; + } + } + } + return unassignedPartitions; + } + + /** + * update the prevAssignment with the partitions, consumer and generation in parameters + * + * @param partitions: The partitions to be updated the prevAssignement + * @param consumer: The consumer Id + * @param prevAssignment: The assignment contains the assignment with the 2nd largest generation + * @param generation: The generation of this assignment (partitions) + */ + private void updatePrevAssignment(Map<TopicPartition, ConsumerGenerationPair> prevAssignment, + List<TopicPartition> partitions, + String consumer, + int generation) { + for (TopicPartition partition: partitions) { + ConsumerGenerationPair consumerGeneration = prevAssignment.get(partition); + if (consumerGeneration != null) { + // only keep the latest previous assignment + if (generation > consumerGeneration.generation) + prevAssignment.put(partition, new ConsumerGenerationPair(consumer, generation)); + } else { + prevAssignment.put(partition, new ConsumerGenerationPair(consumer, generation)); + } + } + } + + /** + * filling in the currentAssignment and prevAssignment from the subscriptions. + * + * @param subscriptions: Map from the member id to their respective topic subscription + * @param currentAssignment: The assignment contains the assignments with the largest generation + * @param prevAssignment: The assignment contains the assignment with the 2nd largest generation + */ private void prepopulateCurrentAssignments(Map<String, Subscription> subscriptions, Map<String, List<TopicPartition>> currentAssignment, Map<TopicPartition, ConsumerGenerationPair> prevAssignment) { // we need to process subscriptions' user data with each consumer's reported generation in mind // higher generations overwrite lower generations in case of a conflict // note that a conflict could exists only if user data is for different generations - // for each partition we create a sorted map of its consumers by generation - Map<TopicPartition, TreeMap<Integer, String>> sortedPartitionConsumersByGeneration = new HashMap<>(); + Set<String> membersOfCurrentHighestGeneration = new HashSet<>(); + int maxGeneration = DEFAULT_GENERATION; + for (Map.Entry<String, Subscription> subscriptionEntry: subscriptions.entrySet()) { String consumer = subscriptionEntry.getKey(); MemberData memberData = memberData(subscriptionEntry.getValue()); - for (TopicPartition partition: memberData.partitions) { - if (sortedPartitionConsumersByGeneration.containsKey(partition)) { - Map<Integer, String> consumers = sortedPartitionConsumersByGeneration.get(partition); - if (memberData.generation.isPresent() && consumers.containsKey(memberData.generation.get())) { - // same partition is assigned to two consumers during the same rebalance. - // log a warning and skip this record - log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.", - partition, memberData.generation); - } else - consumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer); - } else { - TreeMap<Integer, String> sortedConsumers = new TreeMap<>(); - sortedConsumers.put(memberData.generation.orElse(DEFAULT_GENERATION), consumer); - sortedPartitionConsumersByGeneration.put(partition, sortedConsumers); - } - } - } + List<TopicPartition> ownedPartitions = new ArrayList<>(); + currentAssignment.put(consumer, ownedPartitions); - // prevAssignment holds the prior ConsumerGenerationPair (before current) of each partition - // current and previous consumers are the last two consumers of each partition in the above sorted map - for (Map.Entry<TopicPartition, TreeMap<Integer, String>> partitionConsumersEntry: sortedPartitionConsumersByGeneration.entrySet()) { - TopicPartition partition = partitionConsumersEntry.getKey(); - TreeMap<Integer, String> consumers = partitionConsumersEntry.getValue(); - Iterator<Integer> it = consumers.descendingKeySet().iterator(); + // Only consider this consumer's owned partitions as valid if it is a member of the current highest + // generation, or it's generation is not present but we have not seen any known generation so far + if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration + || !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) { - // let's process the current (most recent) consumer first - String consumer = consumers.get(it.next()); - currentAssignment.computeIfAbsent(consumer, k -> new ArrayList<>()); - currentAssignment.get(consumer).add(partition); + // If the current member's generation is higher, all the previously owned partitions are invalid + if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { + for (String member: membersOfCurrentHighestGeneration) { + List<TopicPartition> oldGenerationPartitions = currentAssignment.get(member); + updatePrevAssignment(prevAssignment, oldGenerationPartitions, member, maxGeneration); + oldGenerationPartitions.clear(); + } + membersOfCurrentHighestGeneration.clear(); + maxGeneration = memberData.generation.get(); + } - // now update previous assignment if any - if (it.hasNext()) { - int generation = it.next(); - prevAssignment.put(partition, new ConsumerGenerationPair(consumers.get(generation), generation)); + membersOfCurrentHighestGeneration.add(consumer); + ownedPartitions.addAll(memberData.partitions); + } else if (!memberData.generation.isPresent()) { + // current maxGeneration is larger than DEFAULT_GENERATION, + // put all partitions as DEFAULT_GENERATION into provAssignment Review comment: My refactor is just trying to reach the same `currentAssignment` and `prevAssignment` as before. So, if you meant this: ```java if (memberData.generation.isPresent() && consumers.containsKey(memberData.generation.get())) { // same partition is assigned to two consumers during the same rebalance. // log a warning and skip this record log.warn("Partition '{}' is assigned to multiple consumers following sticky assignment generation {}.", partition, memberData.generation); ``` I think this check is unnecessary since we didn't do anything to it, and we cannot do anything to it, either. That's my thought. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org