smjn commented on code in PR #19431: URL: https://github.com/apache/kafka/pull/19431#discussion_r2036832379
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1248,29 +1248,118 @@ private CompletableFuture<Map<String, Errors>> persisterDeleteToGroupIdErrorMap( }); } - private void populateDeleteShareGroupOffsetsFuture( + private CompletableFuture<DeleteShareGroupOffsetsResponseData> checkInitializedSharePartitionsAndProcess( + String groupId, + DeleteShareGroupOffsetsRequestData requestData + ) { + Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>(); + List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(); + List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = + new ArrayList<>(requestData.topics().size()); + + return runtime.scheduleReadOperation( + "share-group-initialized-partitions", + topicPartitionFor(groupId), + (coordinator, offset) -> coordinator.initializedShareGroupPartitions(groupId) + ).thenCompose(topicPartitionMap -> { + requestData.topics().forEach(topic -> { + Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); + if (topicId != null) { + // A deleteState request to persister should only be sent with those topic partitions for which corresponding + // share partitions are initialized for the group. + if (topicPartitionMap.containsKey(topicId)) { + requestTopicIdToNameMapping.put(topicId, topic.topicName()); + List<DeleteShareGroupStateRequestData.PartitionData> partitions = new ArrayList<>(); + topic.partitions().forEach(partition -> { + if (topicPartitionMap.get(topicId).contains(partition)) { + partitions.add(new DeleteShareGroupStateRequestData.PartitionData().setPartition(partition)); + } + }); + deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions(partitions)); + } + } else { + deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topic.topicName()) + .setPartitions(topic.partitions().stream().map( + partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + ).toList())); + } + }); + + // If the request for the persister is empty, just complete the operation right away. + if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + return CompletableFuture.completedFuture(new DeleteShareGroupOffsetsResponseData().setResponses(deleteShareGroupOffsetsResponseTopicList)); + } + + return sendPersisterDeleteStateRequest( + requestData, + requestTopicIdToNameMapping, + deleteShareGroupStateRequestTopicsData, + deleteShareGroupOffsetsResponseTopicList + ); + }).exceptionally(throwable -> { + log.error("Failed to get initialized topic partitions for the group {}", groupId, throwable); + return DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.forException(throwable)); + }); + } + + private CompletableFuture<DeleteShareGroupOffsetsResponseData> checkIfGroupIsEmptyAndProcess( + String groupId, + DeleteShareGroupOffsetsRequestData requestData + ) { + // This is done to make sure the provided group is empty. Offsets can be deleted only for an empty share group. Review Comment: This seems unnecessary. There is no need to make the `describe-groups` call. You can directly call the shard method on the request and the shard can check its internal state to verify that the group is empty and return the appropriate persister request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org