sjhajharia commented on code in PR #18571:
URL: https://github.com/apache/kafka/pull/18571#discussion_r1920361167


##########
server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -288,7 +291,97 @@ public CompletableFuture<DeleteShareGroupStateResult> 
deleteState(DeleteShareGro
      * @return A completable future of  ReadShareGroupStateSummaryResult
      */
     public CompletableFuture<ReadShareGroupStateSummaryResult> 
readSummary(ReadShareGroupStateSummaryParameters request) {
-        throw new RuntimeException("not implemented");
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate read state summary request", e);
+            return CompletableFuture.failedFuture(e);
+        }
+
+        GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap = new 
HashMap<>();
+        List<PersisterStateManager.ReadStateSummaryHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<ReadShareGroupStateSummaryResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new ReadStateSummaryHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        partitionData.leaderEpoch(),
+                        future,
+                        null
+                    )
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.ReadStateSummaryHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<ReadShareGroupStateResult>
+        return combinedFuture.thenApply(v -> 
readSummaryResponsesToResult(futureMap));
+    }
+
+    /**
+     * Takes in a list of COMPLETED futures and combines the results,
+     * taking care of errors if any, into a single 
ReadShareGroupStateSummaryResult
+     *
+     * @param futureMap - HashMap of {topic -> {partition -> future}}
+     * @return Object representing combined result of type 
ReadShareGroupStateSummaryResult
+     */
+    // visible for testing
+    ReadShareGroupStateSummaryResult readSummaryResponsesToResult(
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap
+    ) {
+        List<TopicData<PartitionStateErrorData>> topicsData = 
futureMap.keySet().stream()
+            .map(topicId -> {
+                List<PartitionStateErrorData> partitionStateErrorData = 
futureMap.get(topicId).entrySet().stream()
+                    .map(partitionFuture -> {
+                        int partition = partitionFuture.getKey();
+                        CompletableFuture<ReadShareGroupStateSummaryResponse> 
future = partitionFuture.getValue();
+                        try {
+                            // already completed because of allOf call in the 
caller
+                            ReadShareGroupStateSummaryResponse 
partitionResponse = future.join();
+                            return 
partitionResponse.data().results().get(0).partitions().stream()
+                                .map(partitionResult -> 
PartitionFactory.newPartitionStateErrorData(

Review Comment:
   I have added a new method for the same called 
`newPartitionStateSummaryData`. In addition I have added a class 
`PartitionStateSummaryData` which contains this data. Correspondingly, the 
`ReadShareGroupStateSummaryResult` now uses this class instead of the old 
`PartitionStateErrorData`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to