jeffkbkim commented on code in PR #15974:
URL: https://github.com/apache/kafka/pull/15974#discussion_r1613811194


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -517,21 +524,89 @@ public Assignment targetAssignment(String memberId) {
     }
 
     /**
-     * Updates target assignment of a member.
+     * @return An immutable map containing all the topic partitions
+     *         with their current member assignments.
+     */
+    public Map<Uuid, Map<Integer, String>> invertedTargetAssignment() {
+        return Collections.unmodifiableMap(invertedTargetAssignment);
+    }
+
+    /**
+     * Updates the target assignment of a member.
      *
      * @param memberId              The member id.
      * @param newTargetAssignment   The new target assignment.
      */
     public void updateTargetAssignment(String memberId, Assignment 
newTargetAssignment) {
+        updateInvertedTargetAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            newTargetAssignment
+        );
         targetAssignment.put(memberId, newTargetAssignment);
     }
 
+    /**
+     * Updates the reverse lookup map of the target assignment.
+     *
+     * @param memberId              The member Id.
+     * @param oldTargetAssignment   The old target assignment.
+     * @param newTargetAssignment   The new target assignment.
+     */
+    private void updateInvertedTargetAssignment(
+        String memberId,
+        Assignment oldTargetAssignment,
+        Assignment newTargetAssignment
+    ) {
+        // Combine keys from both old and new assignments.
+        Set<Uuid> allTopicIds = new HashSet<>();
+        allTopicIds.addAll(oldTargetAssignment.partitions().keySet());
+        allTopicIds.addAll(newTargetAssignment.partitions().keySet());
+
+        for (Uuid topicId : allTopicIds) {
+            Set<Integer> oldPartitions = 
oldTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+            Set<Integer> newPartitions = 
newTargetAssignment.partitions().getOrDefault(topicId, Collections.emptySet());
+
+            TimelineHashMap<Integer, String> topicPartitionAssignment = 
invertedTargetAssignment.computeIfAbsent(
+                topicId, k -> new TimelineHashMap<>(snapshotRegistry, 
Math.max(oldPartitions.size(), newPartitions.size()))
+            );
+
+            // Remove partitions that aren't present in the new assignment 
only if the partition is currently
+            // still assigned to the member in question.
+            // If p0 was moved from A to B, and the target assignment map was 
updated for B first, we don't want to
+            // remove the key p0 from the inverted map and undo the action 
when A eventually tries to update its assignment.
+            for (Integer partition : oldPartitions) {
+                if (!newPartitions.contains(partition) && 
memberId.equals(topicPartitionAssignment.get(partition))) {
+                    topicPartitionAssignment.remove(partition);
+                }
+            }
+
+            // Add partitions that are in new assignment but not in old 
assignment.
+            for (Integer partition : newPartitions) {
+                if (!oldPartitions.contains(partition)) {
+                    topicPartitionAssignment.put(partition, memberId);
+                }
+            }
+
+            if (topicPartitionAssignment.isEmpty()) {
+                invertedTargetAssignment.remove(topicId);
+            } else {
+                invertedTargetAssignment.put(topicId, 
topicPartitionAssignment);
+            }
+        }
+    }
+
     /**
      * Removes the target assignment of a member.
      *
      * @param memberId The member id.
      */
     public void removeTargetAssignment(String memberId) {
+        updateInvertedTargetAssignment(
+            memberId,
+            targetAssignment.getOrDefault(memberId, new 
Assignment(Collections.emptyMap())),
+            new Assignment(Collections.emptyMap())

Review Comment:
   we can use Assignment.EMPTY



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to