chia7712 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2519327764


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,122 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
-                // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
-                // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
-                // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                // Now compute lag for each partition and build the final 
response.
+                computeShareGroupLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeShareGroupLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        // This set keeps track of the partitions for which lag computation is 
needed.
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                if (shouldComputeSharePartitionLag(partitionData)) {
+                    // If the readSummaryResult is successful for a partition, 
we need to compute lag.
+                    partitionsToComputeLag.add(new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition()));
+                }
+            });
+        });
+
+        // Fetch latest offsets for all partitions that need lag computation.
+        Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = 
partitionsToComputeLag.isEmpty() ? Map.of() :
+                
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+
+        // Final response object to be built. It will include lag information 
computed from partitionMetadataClient.
+        
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup 
responseGroup =
+            new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+                .setGroupId(groupId);
+
+        // List of response topics to be set in the response group.
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 responseTopics = new ArrayList<>();
+
+        CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new 
CompletableFuture<?>[0]))
+            .whenComplete((result, error) -> {
+                // The error variable will not be null when one or more of the 
partitionLatestOffsets futures get completed exceptionally.
+                // If that is the case, then the same exception would be 
caught in the try catch executed below when .join() is called.
+                // Thus, we do not need to check error != null here.
+                readSummaryResult.topicsData().forEach(topicData -> {
+                    // Build response for each topic.
+                    
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic 
topic =
+                        new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+                            .setTopicId(topicData.topicId())
+                            
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()));
+
+                    // Build response for each partition within the topic.
+                    
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionResponses = new ArrayList<>();
+
+                    topicData.partitions().forEach(partitionData -> {
+                        TopicPartition tp = new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition());
+                        // For the partitions where lag computation is not 
needed, a partitionResponse is built directly.
+                        // The lag is set to -1 (uninitialized lag) in these 
cases. If the persister returned an error for a
+                        // partition, the startOffset is set to -1 
(uninitialized offset) and the leaderEpoch is set to 0
+                        // (default epoch). This is consistent with 
OffsetFetch for situations in which there is no offset

Review Comment:
   @chirag-wadhwa5 Thanks for your response. My comment may be incorrect, so 
please correct me if I've misunderstood anything.
   
   > In the PartitionFactory class, the same default value of 0 is being used 
in other errors responses as well
   
   Since the comment mentions "This is consistent with OffsetFetch", that made 
me think the epoch value should correctly be `-1` rather than `0` :smile: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to