jeffkbkim commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1613811927


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -517,21 +524,89 @@ public Assignment targetAssignment(String memberId) {
     }
 
     /**
-     * Updates target assignment of a member.
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() {
+        return Collections.unmodifiableMap(invertedTargetAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updateInvertedTargetAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<Uuid> allTopicIds = new HashSet<>();
+        allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+        allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+        for (Uuid topicId : allTopicIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+            TimelineHashMap<Integer, String> topicPartitionAssignment = 
invertedTargetAssignment.computeIfAbsent(
+                topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment 
only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was 
updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action 
when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {
+                    topicPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in new assignment but not in old 
assignment.

Review Comment:
   nit: in the new, in the old



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to