chirag-wadhwa5 commented on code in PR #18976: URL: https://github.com/apache/kafka/pull/18976#discussion_r1976948070
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -1206,6 +1210,93 @@ public CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGrou return future; } + /** + * See {@link GroupCoordinator#deleteShareGroupOffsets(RequestContext, DeleteShareGroupOffsetsRequestData)}. + */ + @Override + public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOffsets( + RequestContext context, + DeleteShareGroupOffsetsRequestData requestData + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); + } + + if (metadataImage == null) { + return CompletableFuture.completedFuture( + DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE)); + } + + Map<Uuid, String> requestTopicIdToNameMapping = new HashMap<>(); + List<DeleteShareGroupStateRequestData.DeleteStateData> deleteShareGroupStateRequestTopicsData = new ArrayList<>(requestData.topics().size()); + List<DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic> deleteShareGroupOffsetsResponseTopicList = new ArrayList<>(requestData.topics().size()); + + requestData.topics().forEach(topic -> { + Uuid topicId = metadataImage.topics().topicNameToIdView().get(topic.topicName()); + if (topicId != null) { + requestTopicIdToNameMapping.put(topicId, topic.topicName()); + deleteShareGroupStateRequestTopicsData.add(new DeleteShareGroupStateRequestData.DeleteStateData() + .setTopicId(topicId) + .setPartitions( + topic.partitions().stream().map( + partitionIndex -> new DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex) + ).toList() + )); + } else { + deleteShareGroupOffsetsResponseTopicList.add(new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic() + .setTopicName(topic.topicName()) + .setPartitions(topic.partitions().stream().map( + partition -> new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message()) + ).toList())); + } + }); + + // If the request for the persister is empty, just complete the operation right away. + if (deleteShareGroupStateRequestTopicsData.isEmpty()) { + return CompletableFuture.completedFuture( + new DeleteShareGroupOffsetsResponseData() + .setResponses(deleteShareGroupOffsetsResponseTopicList)); + } + + DeleteShareGroupStateRequestData deleteShareGroupStateRequestData = new DeleteShareGroupStateRequestData() + .setGroupId(requestData.groupId()) + .setTopics(deleteShareGroupStateRequestTopicsData); + CompletableFuture<DeleteShareGroupOffsetsResponseData> future = new CompletableFuture<>(); + persister.deleteState(DeleteShareGroupStateParameters.from(deleteShareGroupStateRequestData)) + .whenComplete((result, error) -> { + if (error != null) { + log.error("Failed to delete share partitions"); Review Comment: Thanks for the review. The comment you are referring to already says "Failed to delete share partitions". I'm sorry but I didn't understand the suggested change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org