chirag-wadhwa5 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2504568442


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                computeLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+        Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionsResponses = new HashMap<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicIdPartition tp = new TopicIdPartition(
+                    topicData.topicId(),
+                    new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition())
+                );
                 // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
                 // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
                 // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
-                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                if (partitionData.errorCode() != Errors.NONE.code() || 
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+                    partitionsResponses.put(
+                        tp,
+                        new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partitionData.partition())
+                            
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)

Review Comment:
   Thanks for the review. I think the comment above explains why we set 
UNINITIALIZED_START_OFFSET in teh case where persister returns an error. I will 
extend the comment to include explanation for the other OR condition though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to