sjhajharia commented on code in PR #18571:
URL: https://github.com/apache/kafka/pull/18571#discussion_r1918531681


##########
server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java:
##########
@@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> 
deleteState(DeleteShareGro
      * @return A completable future of  ReadShareGroupStateSummaryResult
      */
     public CompletableFuture<ReadShareGroupStateSummaryResult> 
readSummary(ReadShareGroupStateSummaryParameters request) {
-        throw new RuntimeException("not implemented");
+        try {
+            validate(request);
+        } catch (Exception e) {
+            log.error("Unable to validate read state summary request", e);
+            return CompletableFuture.failedFuture(e);
+        }
+        GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = 
request.groupTopicPartitionData();
+        String groupId = gtp.groupId();
+        Map<Uuid, Map<Integer, 
CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap = new 
HashMap<>();
+        List<PersisterStateManager.ReadStateSummaryHandler> handlers = new 
ArrayList<>();
+
+        gtp.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                CompletableFuture<ReadShareGroupStateSummaryResponse> future = 
futureMap
+                    .computeIfAbsent(topicData.topicId(), k -> new HashMap<>())
+                    .computeIfAbsent(partitionData.partition(), k -> new 
CompletableFuture<>());
+
+                handlers.add(
+                    stateManager.new ReadStateSummaryHandler(
+                        groupId,
+                        topicData.topicId(),
+                        partitionData.partition(),
+                        partitionData.leaderEpoch(),
+                        future,
+                        null
+                    )
+                );
+            });
+        });
+
+        for (PersisterStateManager.PersisterStateManagerHandler handler : 
handlers) {
+            stateManager.enqueue(handler);
+        }
+
+        // Combine all futures into a single CompletableFuture<Void>
+        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
+            handlers.stream()
+                .map(PersisterStateManager.ReadStateSummaryHandler::result)
+                .toArray(CompletableFuture[]::new));
+
+        // Transform the combined CompletableFuture<Void> into 
CompletableFuture<ReadShareGroupStateResult>
+        return combinedFuture.thenApply(v -> 
readSummaryResponsesToResult(futureMap));
+    }
+
+    /**
+     * Takes in a list of COMPLETED futures and combines the results,
+     * taking care of errors if any, into a single 
ReadShareGroupStateSummaryResult
+     *
+     * @param futureMap - HashMap of {topic -> {part -> future}}

Review Comment:
   Ack
   Will make changes to the java docs for the read and write RPCs too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to