sjhajharia commented on code in PR #18571: URL: https://github.com/apache/kafka/pull/18571#discussion_r1918531681
########## server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGro * @return A completable future of ReadShareGroupStateSummaryResult */ public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters request) { - throw new RuntimeException("not implemented"); + try { + validate(request); + } catch (Exception e) { + log.error("Unable to validate read state summary request", e); + return CompletableFuture.failedFuture(e); + } + GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap = new HashMap<>(); + List<PersisterStateManager.ReadStateSummaryHandler> handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture<ReadShareGroupStateSummaryResponse> future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new ReadStateSummaryHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.leaderEpoch(), + future, + null + ) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + // Combine all futures into a single CompletableFuture<Void> + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.ReadStateSummaryHandler::result) + .toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResult> + return combinedFuture.thenApply(v -> readSummaryResponsesToResult(futureMap)); + } + + /** + * Takes in a list of COMPLETED futures and combines the results, + * taking care of errors if any, into a single ReadShareGroupStateSummaryResult + * + * @param futureMap - HashMap of {topic -> {part -> future}} Review Comment: Ack Will make changes to the java docs for the read and write RPCs too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org