apoorvmittal10 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2502688264


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                computeLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+        Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionsResponses = new HashMap<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicIdPartition tp = new TopicIdPartition(
+                    topicData.topicId(),
+                    new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition())
+                );
                 // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
                 // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
                 // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
-                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                if (partitionData.errorCode() != Errors.NONE.code() || 
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+                    partitionsResponses.put(
+                        tp,
+                        new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partitionData.partition())
+                            
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)

Review Comment:
   If there is error in `partitionData` for any partition then we won't get 
`startOffset` hence it's safe to put `UNINITIALIZED_START_OFFSET` here, 
correct? Can you please write this as comment. The reason I am asking for the 
comment as there are 2 OR conditions earlier.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                computeLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+        Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionsResponses = new HashMap<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicIdPartition tp = new TopicIdPartition(
+                    topicData.topicId(),
+                    new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition())
+                );

Review Comment:
   The method is overly complicated. Why can't it be a simple one like first 
get the partitions for which lag is to be computed and then in a single parse 
when all futures of lag calculation are complted then fill the result.
   
   ```
   readSummaryResult.topicsData().forEach(topicData ->
               topicData.partitions().forEach(partitionData -> {
                   if (partitionData.errorCode() == Errors.NONE.code()) {
                       partitionsToComputeLag.add(new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition()));
                   }
           }));
   
   .....
   .....
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                computeLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeLagAndBuildResponse(

Review Comment:
   ```suggestion
       private void computeShareGroupLagAndBuildResponse(
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                computeLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+        Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionsResponses = new HashMap<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicIdPartition tp = new TopicIdPartition(
+                    topicData.topicId(),
+                    new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition())
+                );
                 // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
                 // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
                 // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
-                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                if (partitionData.errorCode() != Errors.NONE.code() || 
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+                    partitionsResponses.put(
+                        tp,
+                        new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                            .setPartitionIndex(partitionData.partition())
+                            
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+                            
.setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+                            .setLag(PartitionFactory.UNINITIALIZED_LAG)
+                    );
+                } else {
+                    partitionsToComputeLag.add(new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition()));
+                }
+            });
+        });
 
-                future.complete(
-                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
-                        .setGroupId(readSummaryRequestData.groupId())
-                        
.setTopics(describeShareGroupOffsetsResponseTopicList));
+        Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets = 
partitionsToComputeLag.isEmpty() ? new HashMap<>() :
+                
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+
+        // Create a CompletableFuture for each partition that will complete 
when lag is computed
+        List<CompletableFuture<Void>> lagComputationFutures = new 
ArrayList<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicPartition tp = new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition());
+                TopicIdPartition tip = new 
TopicIdPartition(topicData.topicId(), tp);
+                if (partitionData.errorCode() == Errors.NONE.code() && 
partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET) {
+                    CompletableFuture<Void> lagComputationFuture = 
partitionLatestOffsets.get(tp)
+                        .handle((latestOffset, throwable) -> {
+                            if (throwable != null) {
+                                partitionsResponses.put(
+                                    tip,
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                        
.setPartitionIndex(partitionData.partition())
+                                        
.setErrorCode(Errors.forException(throwable).code())
+                                        
.setErrorMessage(throwable.getMessage())
+                                );
+                            } else {
+                                // Compute lag: lag = partitionLatestOffset - 
startOffset + 1 - deliveryCompleteCount
+                                long lag = latestOffset - 
partitionData.startOffset() + 1 - partitionData.deliveryCompleteCount();
+                                partitionsResponses.put(
+                                    tip,
+                                    new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+                                        
.setPartitionIndex(partitionData.partition())
+                                        
.setStartOffset(partitionData.startOffset())
+                                        
.setLeaderEpoch(partitionData.leaderEpoch())
+                                        .setLag(lag)
+                                );
+                            }
+                            return null;
+                        });
+
+                    lagComputationFutures.add(lagComputationFuture);

Review Comment:
   There is handling code for individual futures but then also they are added 
in a list where again the handling exists, why? Can't we just wait for the 
futures to just complete and then iterate over the original map?
   
   Something like below:
   
   ```
   CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new 
CompletableFuture<?>[0]))
               .whenComplete((result, error) -> {
                 ....
                 ....
                 readSummaryResult.topicsData().forEach(topicData -> {
                   topicData.partitions().forEach(partitionData -> {
                         TopicPartition tp = new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition());
                         TopicIdPartition tip = new 
TopicIdPartition(topicData.topicId(), tp);
                         if (partitionData.errorCode() == Errors.NONE.code() && 
partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET) {
                             // The call to join() is safe here because of the 
allOf above i.e. the futures
                             // have already completed.
                             Long lag = partitionLatestOffsets.get(tp).join();
                             ...
               });



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private 
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
                     return;
                 }
 
+                computeLagAndBuildResponse(
+                    result,
+                    requestTopicIdToNameMapping,
+                    describeShareGroupOffsetsResponseTopicList,
+                    future,
+                    readSummaryRequestData.groupId()
+                );
+            });
+        return future;
+    }
+
+    private void computeLagAndBuildResponse(
+        ReadShareGroupStateSummaryResult readSummaryResult,
+        Map<Uuid, String> requestTopicIdToNameMapping,
+        
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
 describeShareGroupOffsetsResponseTopicList,
+        
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
 responseFuture,
+        String groupId
+    ) {
+        Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+        Map<TopicIdPartition, 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
 partitionsResponses = new HashMap<>();
+
+        readSummaryResult.topicsData().forEach(topicData -> {
+            topicData.partitions().forEach(partitionData -> {
+                TopicIdPartition tp = new TopicIdPartition(
+                    topicData.topicId(),
+                    new 
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()), 
partitionData.partition())
+                );
                 // Return -1 (uninitialized offset) for the situation where 
the persister returned an error.
                 // This is consistent with OffsetFetch for situations in which 
there is no offset information to fetch.
                 // It's treated as absence of data, rather than an error.
-                result.topicsData().forEach(topicData ->
-                    describeShareGroupOffsetsResponseTopicList.add(new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
-                        .setTopicId(topicData.topicId())
-                        
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
-                        .setPartitions(topicData.partitions().stream().map(
-                            partitionData -> new 
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-                                .setPartitionIndex(partitionData.partition())
-                                .setStartOffset(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.startOffset() : 
PartitionFactory.UNINITIALIZED_START_OFFSET)
-                                .setLeaderEpoch(partitionData.errorCode() == 
Errors.NONE.code() ? partitionData.leaderEpoch() : 
PartitionFactory.DEFAULT_LEADER_EPOCH)
-                        ).toList())
-                    ));
+                if (partitionData.errorCode() != Errors.NONE.code() || 
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {

Review Comment:
   And for groups where startOffset is not yet initialized for them the lag 
will not be calculated, is it intended?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to