AndrewJSchofield commented on code in PR #19781:
URL: https://github.com/apache/kafka/pull/19781#discussion_r2103827262


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2771,33 +2766,50 @@ void addInitializingTopicsRecords(String groupId, 
List<CoordinatorRecord> record
         records.add(
             newShareGroupStatePartitionMetadataRecord(
                 groupId,
-                attachTopicName(finalInitializingMap),
-                attachTopicName(currentMap.initializedTopics()),
+                finalInitializingMap,
+                currentMap.initializedTopics(),
                 attachTopicName(currentDeleting)
             )
         );
     }
 
-    // Visibility for tests
-    static Map<Uuid, Set<Integer>> mergeShareGroupInitMaps(
-        Map<Uuid, Set<Integer>> existingShareGroupInitMap,
-        Map<Uuid, Set<Integer>> newShareGroupInitMap
+    // Visibility for testing
+    static Map<Uuid, InitMapValue> combineInitMaps(
+        Map<Uuid, InitMapValue> initialized,
+        Map<Uuid, InitMapValue> initializing
     ) {
-        Map<Uuid, Set<Integer>> finalInitMap = new HashMap<>();
-        Set<Uuid> combinedTopicIdSet = new 
HashSet<>(existingShareGroupInitMap.keySet());
-        combinedTopicIdSet.addAll(newShareGroupInitMap.keySet());
+        Map<Uuid, InitMapValue> finalInitMap = new HashMap<>();
+        Set<Uuid> combinedTopicIdSet = new HashSet<>(initialized.keySet());
+
+        Set<Uuid> initializingSet = initializing.keySet();
+
+        combinedTopicIdSet.addAll(initializingSet);
 
         for (Uuid topicId : combinedTopicIdSet) {
-            Set<Integer> partitions = new 
HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>()));
-            if (newShareGroupInitMap.containsKey(topicId)) {
-                partitions.addAll(newShareGroupInitMap.get(topicId));
+            Set<Integer> initializedPartitions = 
initialized.containsKey(topicId) ? initialized.get(topicId).partitions() : new 
HashSet<>();
+            long timestamp = initialized.containsKey(topicId) ? 
initialized.get(topicId).timestamp() : -1;
+            String name = initialized.containsKey(topicId) ? 
initialized.get(topicId).name() : "UNKNOWN";
+
+            Set<Integer> finalPartitions = new 
HashSet<>(initializedPartitions);
+            if (initializingSet.contains(topicId)) {
+                finalPartitions.addAll(initializing.get(topicId).partitions());
+                timestamp = initializing.get(topicId).timestamp();

Review Comment:
   This is fine. It's just useful to probe the reasoning sometimes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to