chirag-wadhwa5 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2511403647
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1851,139 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
+ // Now compute lag for each partition and build the final
response.
+ computeShareGroupLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeShareGroupLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+
+ // This set keeps track of the partitions for which lag computation is
needed.
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+
+ // This map stores the final
DescribeShareGroupOffsetsResponsePartition, including the lag, for all the
partitions.
+ Map<TopicIdPartition,
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionsResponses = new HashMap<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ TopicIdPartition tp = new TopicIdPartition(
+ topicData.topicId(),
+ new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition())
+ );
// Return -1 (uninitialized offset) for the situation where
the persister returned an error.
// This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
- // It's treated as absence of data, rather than an error.
- result.topicsData().forEach(topicData ->
- describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partitionData.partition())
- .setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
- .setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
- ).toList())
- ));
+ // It's treated as absence of data, rather than an error.
Also, the persister returns startOffset
+ // as -1 (uninitialized offset) for share partitions for which
consumption hasn't begun yet. Thus,
+ // lag computation is not needed in these situations, and -1
(uninitialized lag) is returned.
+ if (partitionData.errorCode() != Errors.NONE.code() ||
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+ partitionsResponses.put(
+ tp,
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionData.partition())
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+
.setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+ .setLag(PartitionFactory.UNINITIALIZED_LAG)
+ );
+ } else {
+ // If the readSummaryResult is successful for a partition,
we need to compute lag.
+ partitionsToComputeLag.add(new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition()));
+ }
+ });
+ });
+
+ // Fetch latest offsets for all partitions that need lag computation.
+ Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets =
partitionsToComputeLag.isEmpty() ? new HashMap<>() :
+
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
- future.complete(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
- .setGroupId(readSummaryRequestData.groupId())
-
.setTopics(describeShareGroupOffsetsResponseTopicList));
+ CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new
CompletableFuture<?>[0]))
+ .whenComplete((result, error) -> {
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ // The partitions that fail this check are already
handled above, and their corresponding
DescribeShareGroupOffsetsResponsePartition
+ // is already present in partitionsResponses.
+ if (partitionData.errorCode() == Errors.NONE.code() &&
partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET) {
+ TopicPartition tp = new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition());
+ TopicIdPartition tip = new
TopicIdPartition(topicData.topicId(), tp);
+ try {
+ // This code is reached when allOf above is
complete, which happens when all the
+ // individual futures are complete. Thus, the
call to join() here is safe.
+ long partitionLatestOffset =
partitionLatestOffsets.get(tp).join();
+ // Compute lag as (partition end offset -
startOffset + 1 - deliveryCompleteCount)
+ long lag = partitionLatestOffset -
partitionData.startOffset() + 1 - partitionData.deliveryCompleteCount();
+ partitionsResponses.put(
+ tip,
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+
.setStartOffset(partitionData.startOffset())
+
.setLeaderEpoch(partitionData.leaderEpoch())
+ .setLag(lag)
+ );
+ } catch (CompletionException e) {
+ // If fetching latest offset for a partition
failed, return the error in the response for that partition.
+ partitionsResponses.put(
+ tip,
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+
.setErrorCode(Errors.forException(e.getCause()).code())
+
.setErrorMessage(e.getCause().getMessage())
+ );
+ }
+ }
+ });
+ });
+
+ // Build the final response and complete the future.
+ responseFuture.complete(BuildDescribeShareGroupOffsetsResponse(
+ partitionsResponses,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ groupId
+ ));
});
- return future;
+ }
+
+ private
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup
BuildDescribeShareGroupOffsetsResponse(
Review Comment:
My mistake, will replace this in the next commit
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]