smjn commented on code in PR #19781: URL: https://github.com/apache/kafka/pull/19781#discussion_r2102752894
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -2771,33 +2766,50 @@ void addInitializingTopicsRecords(String groupId, List<CoordinatorRecord> record records.add( newShareGroupStatePartitionMetadataRecord( groupId, - attachTopicName(finalInitializingMap), - attachTopicName(currentMap.initializedTopics()), + finalInitializingMap, + currentMap.initializedTopics(), attachTopicName(currentDeleting) ) ); } - // Visibility for tests - static Map<Uuid, Set<Integer>> mergeShareGroupInitMaps( - Map<Uuid, Set<Integer>> existingShareGroupInitMap, - Map<Uuid, Set<Integer>> newShareGroupInitMap + // Visibility for testing + static Map<Uuid, InitMapValue> combineInitMaps( + Map<Uuid, InitMapValue> initialized, + Map<Uuid, InitMapValue> initializing ) { - Map<Uuid, Set<Integer>> finalInitMap = new HashMap<>(); - Set<Uuid> combinedTopicIdSet = new HashSet<>(existingShareGroupInitMap.keySet()); - combinedTopicIdSet.addAll(newShareGroupInitMap.keySet()); + Map<Uuid, InitMapValue> finalInitMap = new HashMap<>(); + Set<Uuid> combinedTopicIdSet = new HashSet<>(initialized.keySet()); + + Set<Uuid> initializingSet = initializing.keySet(); + + combinedTopicIdSet.addAll(initializingSet); for (Uuid topicId : combinedTopicIdSet) { - Set<Integer> partitions = new HashSet<>(existingShareGroupInitMap.getOrDefault(topicId, new HashSet<>())); - if (newShareGroupInitMap.containsKey(topicId)) { - partitions.addAll(newShareGroupInitMap.get(topicId)); + Set<Integer> initializedPartitions = initialized.containsKey(topicId) ? initialized.get(topicId).partitions() : new HashSet<>(); + long timestamp = initialized.containsKey(topicId) ? initialized.get(topicId).timestamp() : -1; + String name = initialized.containsKey(topicId) ? initialized.get(topicId).name() : "UNKNOWN"; + + Set<Integer> finalPartitions = new HashSet<>(initializedPartitions); + if (initializingSet.contains(topicId)) { + finalPartitions.addAll(initializing.get(topicId).partitions()); + timestamp = initializing.get(topicId).timestamp(); Review Comment: @AndrewJSchofield The rationale behind this was: 1. All initialized and initializing (only fresh ones)- when computing the change map to build the request, so the resultant will have a fresh timestamp but the code which uses the result does not care about the timestamp. It creates a new map which has all topics which are old initializing (not part of the combined map) and first time seen subscribed topics and uses the current timestamp in the new map. This is then used to build the init request. 2. When building the request with some topic partitions, we write the metadata record with updated info containing current initializing topics (per last replay) and fresh ones as output of step 1. All the partitions will again be updated per the latest timestamp BUT this is the initializing map which means that none of the partitions have been assigned hence, older partition initialize might get delayed but will not cause corruption. 3. when we mark partitions as initialized here the inputs are current initialized and fresh ones being initialized and the timestamps do not matter here. 4. when building delete request - we are combining currently deleting topics and any new topics from initializing or initialized which need to be deleted - timestamps do not matter here too. Hence, one timestamp per topic was added. If we want further fine grain control ans case 2 above seems disagreeable - we will need to add timestamp per partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org