apoorvmittal10 commented on code in PR #18834:
URL: https://github.com/apache/kafka/pull/18834#discussion_r1950810086
##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsRequest.java:
##########
@@ -87,23 +101,10 @@ public static DescribeShareGroupOffsetsRequest
parse(ByteBuffer buffer, short ve
);
}
- public static
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
getErrorDescribeShareGroupOffsets(
-
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
topics,
- Errors error
- ) {
- return topics.stream()
- .map(
- requestTopic -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicName(requestTopic.topicName())
- .setPartitions(
- requestTopic.partitions().stream().map(
- partition -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
- .setPartitionIndex(partition)
- .setErrorCode(error.code())
- .setErrorMessage(error.message())
- .setStartOffset(0)
- ).collect(Collectors.toList())
- )
- ).collect(Collectors.toList());
+ public static DescribeShareGroupOffsetsResponseGroup
getErrorDescribedGroup(String groupId, Errors error) {
+ return new DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ .setErrorMessage(error.message());
Review Comment:
This will always send back the default error message, do we want to do that
or associate acutal error message which would be present in `Throwable`? If we
want actual error message then we should not accept `Errors` in the method
alone, should have some presence of `Throwable` or actual error message.
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -71,63 +73,122 @@ public AdminApiLookupStrategy<CoordinatorKey>
lookupStrategy() {
return lookupStrategy;
}
+ private void validateKeys(Set<CoordinatorKey> groupIds) {
+ Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+ if (!keys.containsAll(groupIds)) {
+ throw new IllegalArgumentException("Received unexpected group ids
" + groupIds +
+ " (expected one of " + keys + ")");
+ }
+ }
+
@Override
public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
- List<String> groupIds = keys.stream().map(key -> {
- if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
- throw new IllegalArgumentException("Invalid group coordinator
key " + key +
- " when building `DescribeShareGroupOffsets` request");
+ validateKeys(keys);
+
+ List<DescribeShareGroupOffsetsRequestGroup> groups = new
ArrayList<>(keys.size());
+ keys.forEach(coordinatorKey -> {
+ String groupId = coordinatorKey.idValue;
+ ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+ DescribeShareGroupOffsetsRequestGroup requestGroup = new
DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId(groupId);
+
+ Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+ spec.topicPartitions().forEach(tp ->
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new
LinkedList<>()).add(tp.partition()));
+
+ Map<String, DescribeShareGroupOffsetsRequestTopic> requestTopics =
new HashMap<>();
+ for (TopicPartition tp : spec.topicPartitions()) {
+ requestTopics.computeIfAbsent(tp.topic(), t ->
+ new DescribeShareGroupOffsetsRequestTopic()
+ .setTopicName(tp.topic())
+ .setPartitions(new LinkedList<>()))
+ .partitions()
+ .add(tp.partition());
}
- return key.idValue;
- }).collect(Collectors.toList());
- // The DescribeShareGroupOffsetsRequest only includes a single group
ID at this point, which is likely a mistake to be fixing a follow-on PR.
- String groupId = groupIds.isEmpty() ? null : groupIds.get(0);
- if (groupId == null) {
- throw new IllegalArgumentException("Missing group id in request");
- }
- ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
-
List<DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic>
topics =
- spec.topicPartitions().stream().map(
- topicPartition -> new
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestTopic()
- .setTopicName(topicPartition.topic())
- .setPartitions(List.of(topicPartition.partition()))
- ).collect(Collectors.toList());
+ requestGroup.setTopics(new ArrayList<>(requestTopics.values()));
+ groups.add(requestGroup);
+ });
DescribeShareGroupOffsetsRequestData data = new
DescribeShareGroupOffsetsRequestData()
- .setGroupId(groupId)
- .setTopics(topics);
+ .setGroups(groups);
return new DescribeShareGroupOffsetsRequest.Builder(data, true);
}
@Override
public ApiResult<CoordinatorKey, Map<TopicPartition, Long>>
handleResponse(Node coordinator,
Set<CoordinatorKey> groupIds,
AbstractResponse abstractResponse) {
+ validateKeys(groupIds);
+
final DescribeShareGroupOffsetsResponse response =
(DescribeShareGroupOffsetsResponse) abstractResponse;
final Map<CoordinatorKey, Map<TopicPartition, Long>> completed = new
HashMap<>();
final Map<CoordinatorKey, Throwable> failed = new HashMap<>();
+ final List<CoordinatorKey> unmapped = new ArrayList<>();
- for (CoordinatorKey groupId : groupIds) {
- Map<TopicPartition, Long> data = new HashMap<>();
- response.data().responses().stream().map(
- describedTopic ->
- describedTopic.partitions().stream().map(
- partition -> {
- if (partition.errorCode() == Errors.NONE.code())
- data.put(new
TopicPartition(describedTopic.topicName(), partition.partitionIndex()),
partition.startOffset());
- else
- log.error("Skipping return offset for topic {}
partition {} due to error {}.", describedTopic.topicName(),
partition.partitionIndex(), Errors.forCode(partition.errorCode()));
- return data;
+ for (CoordinatorKey coordinatorKey : groupIds) {
+ String groupId = coordinatorKey.idValue;
+ if (response.hasGroupError(groupId)) {
+ handleGroupError(coordinatorKey, response.groupError(groupId),
failed, unmapped);
+ } else {
+ Map<TopicPartition, Long> groupOffsetsListing = new
HashMap<>();
+ response.data().groups().stream().filter(g ->
g.groupId().equals(groupId)).forEach(groupResponse -> {
+ for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic
topicResponse : groupResponse.topics()) {
+ for
(DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition
partitionResponse : topicResponse.partitions()) {
+ TopicPartition tp = new
TopicPartition(topicResponse.topicName(), partitionResponse.partitionIndex());
+ if (partitionResponse.errorCode() ==
Errors.NONE.code()) {
+ // Negative offset indicates there is no start
offset for this partition
+ if (partitionResponse.startOffset() < 0) {
+ groupOffsetsListing.put(tp, null);
+ } else {
+ groupOffsetsListing.put(tp,
partitionResponse.startOffset());
+ }
+ } else {
+ log.warn("Skipping return offset for {} due to
error {}: {}.", tp, partitionResponse.errorCode(),
partitionResponse.errorMessage());
+ }
}
- ).collect(Collectors.toList())
- ).collect(Collectors.toList());
- completed.put(groupId, data);
+ }
+ });
+
+ completed.put(coordinatorKey, groupOffsetsListing);
+ }
}
- return new ApiResult<>(completed, failed, Collections.emptyList());
+ return new ApiResult<>(completed, failed, unmapped);
}
private static Set<CoordinatorKey> coordinatorKeys(Collection<String>
groupIds) {
return groupIds.stream()
.map(CoordinatorKey::byGroupId)
.collect(Collectors.toSet());
}
+
+ private void handleGroupError(CoordinatorKey groupId,
+ Errors error,
+ Map<CoordinatorKey, Throwable> failed,
+ List<CoordinatorKey> groupsToUnmap) {
+ switch (error) {
+ case GROUP_AUTHORIZATION_FAILED:
+ case UNKNOWN_MEMBER_ID:
+ case STALE_MEMBER_EPOCH:
+ log.debug("`DescribeShareGroupOffsets` request for group id {}
failed due to error {}", groupId.idValue, error);
Review Comment:
Should it be `ListShareGroupOffsets` here and other logs?
```suggestion
log.debug("`ListShareGroupOffsets` request for group id {}
failed due to error {}", groupId.idValue, error);
```
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -71,63 +73,122 @@ public AdminApiLookupStrategy<CoordinatorKey>
lookupStrategy() {
return lookupStrategy;
}
+ private void validateKeys(Set<CoordinatorKey> groupIds) {
+ Set<CoordinatorKey> keys = coordinatorKeys(groupSpecs.keySet());
+ if (!keys.containsAll(groupIds)) {
+ throw new IllegalArgumentException("Received unexpected group ids
" + groupIds +
+ " (expected one of " + keys + ")");
+ }
+ }
Review Comment:
nit: may be move the private method after public then it's easier to read
class.
##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -17,21 +17,84 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
private final DescribeShareGroupOffsetsResponseData data;
+ private final Map<String, Errors> groupLevelErrors = new HashMap<>();
public
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
this.data = data;
+ for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+ this.groupLevelErrors.put(group.groupId(),
Errors.forCode(group.errorCode()));
+ }
+ }
+
+ public DescribeShareGroupOffsetsResponse(int throttleTimeMs,
+ Map<String, Throwable> errorsMap,
+ Map<String, Map<TopicIdPartition,
DescribeShareGroupOffsetsResponsePartition>> responseData) {
+ super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+ List<DescribeShareGroupOffsetsResponseGroup> groupList = new
ArrayList<>();
+ for (Map.Entry<String, Map<TopicIdPartition,
DescribeShareGroupOffsetsResponsePartition>> groupEntry :
responseData.entrySet()) {
+ String groupId = groupEntry.getKey();
+ Map<TopicIdPartition, DescribeShareGroupOffsetsResponsePartition>
partitionDataMap = groupEntry.getValue();
+ Map<String, DescribeShareGroupOffsetsResponseTopic> topicDataMap =
new HashMap<>();
Review Comment:
Hmmm, this map is never updated. Not sure if it's adding any purpose here.
##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -17,21 +17,84 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
private final DescribeShareGroupOffsetsResponseData data;
+ private final Map<String, Errors> groupLevelErrors = new HashMap<>();
public
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
this.data = data;
+ for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+ this.groupLevelErrors.put(group.groupId(),
Errors.forCode(group.errorCode()));
Review Comment:
`errorMessage` in `DescribeShareGroupOffsetsResponseGroup` if set to
something custom will be lost as now it will default to `Errors` error code
message.
##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -42,11 +105,14 @@ public DescribeShareGroupOffsetsResponseData data() {
@Override
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
- data.responses().forEach(
- result -> result.partitions().forEach(
- partitionResult -> updateErrorCounts(counts,
Errors.forCode(partitionResult.errorCode()))
- )
- );
+ for (Map.Entry<String, Errors> entry: groupLevelErrors.entrySet()) {
+ updateErrorCounts(counts, entry.getValue());
+ }
Review Comment:
nit:
```suggestion
groupLevelErrors.values().forEach(error -> updateErrorCounts(counts,
error));
```
##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -17,21 +17,84 @@
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
private final DescribeShareGroupOffsetsResponseData data;
+ private final Map<String, Errors> groupLevelErrors = new HashMap<>();
public
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
this.data = data;
+ for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+ this.groupLevelErrors.put(group.groupId(),
Errors.forCode(group.errorCode()));
+ }
+ }
+
+ public DescribeShareGroupOffsetsResponse(int throttleTimeMs,
+ Map<String, Throwable> errorsMap,
+ Map<String, Map<TopicIdPartition,
DescribeShareGroupOffsetsResponsePartition>> responseData) {
+ super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+ List<DescribeShareGroupOffsetsResponseGroup> groupList = new
ArrayList<>();
+ for (Map.Entry<String, Map<TopicIdPartition,
DescribeShareGroupOffsetsResponsePartition>> groupEntry :
responseData.entrySet()) {
+ String groupId = groupEntry.getKey();
+ Map<TopicIdPartition, DescribeShareGroupOffsetsResponsePartition>
partitionDataMap = groupEntry.getValue();
Review Comment:
Can be replaced by a single line:
```suggestion
responseData.forEach((groupId, partitionDataMap) -> {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]