jeffkbkim commented on code in PR #15974: URL: https://github.com/apache/kafka/pull/15974#discussion_r1613811927
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -517,21 +524,89 @@ public Assignment targetAssignment(String memberId) { } /** - * Updates target assignment of a member. + * @return An immutable map containing all the topic partitions + * with their current member assignments. + */ + public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() { + return Collections.unmodifiableMap(invertedTargetAssignment); + } + + /** + * Updates the target assignment of a member. * * @param memberId The member id. * @param newTargetAssignment The new target assignment. */ public void updateTargetAssignment(String memberId, Assignment newTargetAssignment) { + updateInvertedTargetAssignment( + memberId, + targetAssignment.getOrDefault(memberId, new Assignment(Collections.emptyMap())), + newTargetAssignment + ); targetAssignment.put(memberId, newTargetAssignment); } + /** + * Updates the reverse lookup map of the target assignment. + * + * @param memberId The member Id. + * @param oldTargetAssignment The old target assignment. + * @param newTargetAssignment The new target assignment. + */ + private void updateInvertedTargetAssignment( + String memberId, + Assignment oldTargetAssignment, + Assignment newTargetAssignment + ) { + // Combine keys from both old and new assignments. + Set<Uuid> allTopicIds = new HashSet<>(); + allTopicIds.addAll(oldTargetAssignment.partitions().keySet()); + allTopicIds.addAll(newTargetAssignment.partitions().keySet()); + + for (Uuid topicId : allTopicIds) { + Set<Integer> oldPartitions = oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + Set<Integer> newPartitions = newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet()); + + TimelineHashMap<Integer, String> topicPartitionAssignment = invertedTargetAssignment.computeIfAbsent( + topicId, k -> new TimelineHashMap<>(snapshotRegistry, Math.max(oldPartitions.size(), newPartitions.size())) + ); + + // Remove partitions that aren't present in the new assignment only if the partition is currently + // still assigned to the member in question. + // If p0 was moved from A to B, and the target assignment map was updated for B first, we don't want to + // remove the key p0 from the inverted map and undo the action when A eventually tries to update its assignment. + for (Integer partition : oldPartitions) { + if (!newPartitions.contains(partition) && memberId.equals(topicPartitionAssignment.get(partition))) { + topicPartitionAssignment.remove(partition); + } + } + + // Add partitions that are in new assignment but not in old assignment. Review Comment: nit: in the new, in the old -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org