apoorvmittal10 commented on code in PR #20839:
URL: https://github.com/apache/kafka/pull/20839#discussion_r2502688264
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
+ computeLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+ Map<TopicIdPartition,
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionsResponses = new HashMap<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ TopicIdPartition tp = new TopicIdPartition(
+ topicData.topicId(),
+ new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition())
+ );
// Return -1 (uninitialized offset) for the situation where
the persister returned an error.
// This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
// It's treated as absence of data, rather than an error.
- result.topicsData().forEach(topicData ->
- describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partitionData.partition())
- .setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
- .setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
- ).toList())
- ));
+ if (partitionData.errorCode() != Errors.NONE.code() ||
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+ partitionsResponses.put(
+ tp,
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionData.partition())
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
Review Comment:
If there is error in `partitionData` for any partition then we won't get
`startOffset` hence it's safe to put `UNINITIALIZED_START_OFFSET` here,
correct? Can you please write this as comment. The reason I am asking for the
comment as there are 2 OR conditions earlier.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
+ computeLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+ Map<TopicIdPartition,
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionsResponses = new HashMap<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ TopicIdPartition tp = new TopicIdPartition(
+ topicData.topicId(),
+ new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition())
+ );
Review Comment:
The method is overly complicated. Why can't it be a simple one like first
get the partitions for which lag is to be computed and then in a single parse
when all futures of lag calculation are complted then fill the result.
```
readSummaryResult.topicsData().forEach(topicData ->
topicData.partitions().forEach(partitionData -> {
if (partitionData.errorCode() == Errors.NONE.code()) {
partitionsToComputeLag.add(new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition()));
}
}));
.....
.....
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
+ computeLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeLagAndBuildResponse(
Review Comment:
```suggestion
private void computeShareGroupLagAndBuildResponse(
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
+ computeLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+ Map<TopicIdPartition,
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionsResponses = new HashMap<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ TopicIdPartition tp = new TopicIdPartition(
+ topicData.topicId(),
+ new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition())
+ );
// Return -1 (uninitialized offset) for the situation where
the persister returned an error.
// This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
// It's treated as absence of data, rather than an error.
- result.topicsData().forEach(topicData ->
- describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partitionData.partition())
- .setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
- .setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
- ).toList())
- ));
+ if (partitionData.errorCode() != Errors.NONE.code() ||
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
+ partitionsResponses.put(
+ tp,
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionData.partition())
+
.setStartOffset(PartitionFactory.UNINITIALIZED_START_OFFSET)
+
.setLeaderEpoch(PartitionFactory.DEFAULT_LEADER_EPOCH)
+ .setLag(PartitionFactory.UNINITIALIZED_LAG)
+ );
+ } else {
+ partitionsToComputeLag.add(new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition()));
+ }
+ });
+ });
- future.complete(
- new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
- .setGroupId(readSummaryRequestData.groupId())
-
.setTopics(describeShareGroupOffsetsResponseTopicList));
+ Map<TopicPartition, CompletableFuture<Long>> partitionLatestOffsets =
partitionsToComputeLag.isEmpty() ? new HashMap<>() :
+
partitionMetadataClient.listLatestOffsets(partitionsToComputeLag);
+
+ // Create a CompletableFuture for each partition that will complete
when lag is computed
+ List<CompletableFuture<Void>> lagComputationFutures = new
ArrayList<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ TopicPartition tp = new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition());
+ TopicIdPartition tip = new
TopicIdPartition(topicData.topicId(), tp);
+ if (partitionData.errorCode() == Errors.NONE.code() &&
partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET) {
+ CompletableFuture<Void> lagComputationFuture =
partitionLatestOffsets.get(tp)
+ .handle((latestOffset, throwable) -> {
+ if (throwable != null) {
+ partitionsResponses.put(
+ tip,
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+
.setErrorCode(Errors.forException(throwable).code())
+
.setErrorMessage(throwable.getMessage())
+ );
+ } else {
+ // Compute lag: lag = partitionLatestOffset -
startOffset + 1 - deliveryCompleteCount
+ long lag = latestOffset -
partitionData.startOffset() + 1 - partitionData.deliveryCompleteCount();
+ partitionsResponses.put(
+ tip,
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+
.setPartitionIndex(partitionData.partition())
+
.setStartOffset(partitionData.startOffset())
+
.setLeaderEpoch(partitionData.leaderEpoch())
+ .setLag(lag)
+ );
+ }
+ return null;
+ });
+
+ lagComputationFutures.add(lagComputationFuture);
Review Comment:
There is handling code for individual futures but then also they are added
in a list where again the handling exists, why? Can't we just wait for the
futures to just complete and then iterate over the original map?
Something like below:
```
CompletableFuture.allOf(partitionLatestOffsets.values().toArray(new
CompletableFuture<?>[0]))
.whenComplete((result, error) -> {
....
....
readSummaryResult.topicsData().forEach(topicData -> {
topicData.partitions().forEach(partitionData -> {
TopicPartition tp = new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition());
TopicIdPartition tip = new
TopicIdPartition(topicData.topicId(), tp);
if (partitionData.errorCode() == Errors.NONE.code() &&
partitionData.startOffset() != PartitionFactory.UNINITIALIZED_START_OFFSET) {
// The call to join() is safe here because of the
allOf above i.e. the futures
// have already completed.
Long lag = partitionLatestOffsets.get(tp).join();
...
});
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1835,27 +1852,128 @@ private
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGro
return;
}
+ computeLagAndBuildResponse(
+ result,
+ requestTopicIdToNameMapping,
+ describeShareGroupOffsetsResponseTopicList,
+ future,
+ readSummaryRequestData.groupId()
+ );
+ });
+ return future;
+ }
+
+ private void computeLagAndBuildResponse(
+ ReadShareGroupStateSummaryResult readSummaryResult,
+ Map<Uuid, String> requestTopicIdToNameMapping,
+
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList,
+
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
responseFuture,
+ String groupId
+ ) {
+ Set<TopicPartition> partitionsToComputeLag = new HashSet<>();
+ Map<TopicIdPartition,
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition>
partitionsResponses = new HashMap<>();
+
+ readSummaryResult.topicsData().forEach(topicData -> {
+ topicData.partitions().forEach(partitionData -> {
+ TopicIdPartition tp = new TopicIdPartition(
+ topicData.topicId(),
+ new
TopicPartition(requestTopicIdToNameMapping.get(topicData.topicId()),
partitionData.partition())
+ );
// Return -1 (uninitialized offset) for the situation where
the persister returned an error.
// This is consistent with OffsetFetch for situations in which
there is no offset information to fetch.
// It's treated as absence of data, rather than an error.
- result.topicsData().forEach(topicData ->
- describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partitionData.partition())
- .setStartOffset(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.startOffset() :
PartitionFactory.UNINITIALIZED_START_OFFSET)
- .setLeaderEpoch(partitionData.errorCode() ==
Errors.NONE.code() ? partitionData.leaderEpoch() :
PartitionFactory.DEFAULT_LEADER_EPOCH)
- ).toList())
- ));
+ if (partitionData.errorCode() != Errors.NONE.code() ||
partitionData.startOffset() == PartitionFactory.UNINITIALIZED_START_OFFSET) {
Review Comment:
And for groups where startOffset is not yet initialized for them the lag
will not be calculated, is it intended?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]