apoorvmittal10 commented on code in PR #18834:
URL: https://github.com/apache/kafka/pull/18834#discussion_r1954238834
##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java:
##########
@@ -73,61 +75,121 @@ public AdminApiLookupStrategy<CoordinatorKey>
lookupStrategy() {
@Override
public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int
coordinatorId, Set<CoordinatorKey> keys) {
- List<String> groupIds = keys.stream().map(key -> {
- if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
- throw new IllegalArgumentException("Invalid group coordinator
key " + key +
- " when building `DescribeShareGroupOffsets` request");
+ validateKeys(keys);
+
+ List<DescribeShareGroupOffsetsRequestGroup> groups = new
ArrayList<>(keys.size());
+ keys.forEach(coordinatorKey -> {
+ String groupId = coordinatorKey.idValue;
+ ListShareGroupOffsetsSpec spec = groupSpecs.get(groupId);
+ DescribeShareGroupOffsetsRequestGroup requestGroup = new
DescribeShareGroupOffsetsRequestGroup()
+ .setGroupId(groupId);
+
+ Map<String, List<Integer>> topicPartitionMap = new HashMap<>();
+ spec.topicPartitions().forEach(tp ->
topicPartitionMap.computeIfAbsent(tp.topic(), t -> new
LinkedList<>()).add(tp.partition()));
Review Comment:
Query: Why do we need to modify input param which is part of request?
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10456,13 +10458,15 @@ class KafkaApisTest extends Logging {
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
- response.data.responses.forEach(topic =>
topic.partitions().forEach(partition =>
assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode())))
+ response.data.groups.forEach(group => group.topics.forEach(topic =>
topic.partitions.forEach(partition =>
assertEquals(Errors.UNSUPPORTED_VERSION.code(), partition.errorCode()))))
Review Comment:
```suggestion
response.data.groups.forEach(group => group.topics.forEach(topic =>
topic.partitions.forEach(partition =>
assertEquals(Errors.UNSUPPORTED_VERSION.code, partition.errorCode))))
```
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10478,9 +10482,11 @@ class KafkaApisTest extends Logging {
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
- response.data.responses.forEach(
- topic => topic.partitions().forEach(
- partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(),
partition.errorCode())
+ response.data.groups.forEach(
+ group => group.topics.forEach(
+ topic => topic.partitions.forEach(
+ partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code(),
partition.errorCode())
Review Comment:
```suggestion
partition => assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code,
partition.errorCode)
```
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10491,31 +10497,49 @@ class KafkaApisTest extends Logging {
val topicId1 = Uuid.randomUuid()
val topicName2 = "topic-2"
val topicId2 = Uuid.randomUuid()
+ val topicName3 = "topic-3"
+ val topicId3 = Uuid.randomUuid()
Review Comment:
```suggestion
val topicId3 = Uuid.randomUuid
```
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10558,7 +10582,81 @@ class KafkaApisTest extends Logging {
))
))
- future.complete(describeShareGroupOffsetsResponse)
+ val describeShareGroupOffsetsResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setTopicId(topicId3)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData()
+ .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1,
describeShareGroupOffsetsResponseGroup2))
+
+ futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+ futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(describeShareGroupOffsetsResponse, response.data)
+ }
+
+ @Test
+ def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData()
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build())
Review Comment:
```suggestion
val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build)
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -956,47 +957,62 @@ public
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
}
/**
- * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext,
DescribeShareGroupOffsetsRequestData)}.
+ * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
*/
@Override
- public CompletableFuture<DescribeShareGroupOffsetsResponseData>
describeShareGroupOffsets(
+ public
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
describeShareGroupOffsets(
RequestContext context,
- DescribeShareGroupOffsetsRequestData requestData
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData
) {
+ final HashMap<Uuid, String> requestTopicIdToNameMapping = new
HashMap<>();
Review Comment:
Why not just Map?
```suggestion
final Map<Uuid, String> requestTopicIdToNameMapping = new
HashMap<>();
```
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10558,7 +10582,81 @@ class KafkaApisTest extends Logging {
))
))
- future.complete(describeShareGroupOffsetsResponse)
+ val describeShareGroupOffsetsResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setTopicId(topicId3)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData()
+ .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1,
describeShareGroupOffsetsResponseGroup2))
+
+ futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+ futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(describeShareGroupOffsetsResponse, response.data)
+ }
+
+ @Test
+ def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData()
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build())
+
+ val future = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]()
Review Comment:
nit: Same else where:
```suggestion
val future = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]
```
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -10558,7 +10582,81 @@ class KafkaApisTest extends Logging {
))
))
- future.complete(describeShareGroupOffsetsResponse)
+ val describeShareGroupOffsetsResponseGroup2 = new
DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId("group2")
+ .setTopics(util.List.of(
+ new DescribeShareGroupOffsetsResponseTopic()
+ .setTopicName(topicName3)
+ .setTopicId(topicId3)
+ .setPartitions(util.List.of(
+ new DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(0)
+ .setStartOffset(0)
+ .setLeaderEpoch(1)
+ .setErrorMessage(null)
+ .setErrorCode(0)
+ ))
+ ))
+
+ val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData()
+ .setGroups(util.List.of(describeShareGroupOffsetsResponseGroup1,
describeShareGroupOffsetsResponseGroup2))
+
+ futureGroup1.complete(describeShareGroupOffsetsResponseGroup1)
+ futureGroup2.complete(describeShareGroupOffsetsResponseGroup2)
+ val response =
verifyNoThrottling[DescribeShareGroupOffsetsResponse](requestChannelRequest)
+ assertEquals(describeShareGroupOffsetsResponse, response.data)
+ }
+
+ @Test
+ def testDescribeShareGroupOffsetsRequestEmptyGroupsSuccess(): Unit = {
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () =>
KRaftVersion.KRAFT_VERSION_0)
+
+ val describeShareGroupOffsetsRequest = new
DescribeShareGroupOffsetsRequestData()
+
+ val requestChannelRequest = buildRequest(new
DescribeShareGroupOffsetsRequest.Builder(describeShareGroupOffsetsRequest,
true).build())
+
+ val future = new
CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]()
+ kafkaApis = createKafkaApis(
+ overrideProperties = Map(ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG ->
"true"),
+ )
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val describeShareGroupOffsetsResponseGroup = new
DescribeShareGroupOffsetsResponseGroup()
+
+ val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData()
Review Comment:
```suggestion
val describeShareGroupOffsetsResponseGroup = new
DescribeShareGroupOffsetsResponseGroup
val describeShareGroupOffsetsResponse = new
DescribeShareGroupOffsetsResponseData
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -956,47 +957,62 @@ public
CompletableFuture<OffsetFetchResponseData.OffsetFetchResponseGroup> fetch
}
/**
- * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext,
DescribeShareGroupOffsetsRequestData)}.
+ * See {@link GroupCoordinator#describeShareGroupOffsets(RequestContext,
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup)}.
*/
@Override
- public CompletableFuture<DescribeShareGroupOffsetsResponseData>
describeShareGroupOffsets(
+ public
CompletableFuture<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup>
describeShareGroupOffsets(
RequestContext context,
- DescribeShareGroupOffsetsRequestData requestData
+
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup
requestData
) {
+ final HashMap<Uuid, String> requestTopicIdToNameMapping = new
HashMap<>();
Review Comment:
Also can be moved after first 2 if blocks as it's being used later.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3190,26 +3190,51 @@ class KafkaApis(val requestChannel: RequestChannel,
def handleDescribeShareGroupOffsetsRequest(request: RequestChannel.Request):
Unit = {
val describeShareGroupOffsetsRequest =
request.body[DescribeShareGroupOffsetsRequest]
+ val groups = describeShareGroupOffsetsRequest.groups()
+
+ val futures = new
mutable.ArrayBuffer[CompletableFuture[DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup]](groups.size)
+ groups.forEach { groupDescribeOffsets =>
+ if (!isShareGroupProtocolEnabled) {
+ futures += CompletableFuture.completedFuture(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupDescribeOffsets.groupId)
+ .setErrorCode(Errors.UNSUPPORTED_VERSION.code))
+ } else if (!authHelper.authorize(request.context, READ, GROUP,
groupDescribeOffsets.groupId)) {
+ futures += CompletableFuture.completedFuture(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupDescribeOffsets.groupId)
+ .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+ } else if (groupDescribeOffsets.topics.isEmpty) {
+ futures += CompletableFuture.completedFuture(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupDescribeOffsets.groupId)
+ .setErrorCode(Errors.NONE.code))
Review Comment:
Do we need to set this error code specifically? It should be none by default.
##########
clients/src/main/java/org/apache/kafka/common/requests/DescribeShareGroupOffsetsResponse.java:
##########
@@ -18,20 +18,56 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData;
+import
org.apache.kafka.common.message.DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class DescribeShareGroupOffsetsResponse extends AbstractResponse {
private final DescribeShareGroupOffsetsResponseData data;
+ private final Map<String, Throwable> groupLevelErrors = new HashMap<>();
public
DescribeShareGroupOffsetsResponse(DescribeShareGroupOffsetsResponseData data) {
super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
this.data = data;
+ for (DescribeShareGroupOffsetsResponseGroup group : data.groups()) {
+ if (group.errorCode() != Errors.NONE.code()) {
+ this.groupLevelErrors.put(group.groupId(),
Errors.forCode(group.errorCode()).exception(group.errorMessage()));
+ }
+ }
+ }
+
+ // Builds a response with the same group-level error for all groups and
empty topics lists for all groups
+ public DescribeShareGroupOffsetsResponse(int throttleTimeMs,
+ List<String> groupIds,
+ Throwable allGroupsException) {
+ super(ApiKeys.DESCRIBE_SHARE_GROUP_OFFSETS);
+ short errorCode = Errors.forException(allGroupsException).code();
+ List<DescribeShareGroupOffsetsResponseGroup> groupList = new
ArrayList<>();
+ groupIds.forEach(groupId -> {
+ groupList.add(new DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(groupId)
+ .setErrorCode(errorCode)
+ .setErrorMessage(errorCode ==
Errors.UNKNOWN_SERVER_ERROR.code() ? Errors.forCode(errorCode).message() :
allGroupsException.getMessage()));
Review Comment:
Query: Why do we want special handling for `UNKNOWN_SERVER_ERROR`, rather
shouldn't we have:
```
.setErrorMessage(allGroupsException.getMessage() != null ?
allGroupsException.getMessage() : Errors.forCode(errorCode).message()));
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -1009,20 +1025,23 @@ public
CompletableFuture<DescribeShareGroupOffsetsResponseData> describeShareGro
future.completeExceptionally(new
IllegalStateException("Result is null for the read state summary"));
return;
}
-
List<DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic>
describeShareGroupOffsetsResponseTopicList =
- result.topicsData().stream().map(
- topicData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
- .setTopicId(topicData.topicId())
-
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
- .setPartitions(topicData.partitions().stream().map(
- partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
-
.setPartitionIndex(partitionData.partition())
-
.setStartOffset(partitionData.startOffset())
-
.setErrorMessage(partitionData.errorMessage())
- .setErrorCode(partitionData.errorCode())
- ).toList())
- ).toList();
- future.complete(new
DescribeShareGroupOffsetsResponseData().setResponses(describeShareGroupOffsetsResponseTopicList));
+ result.topicsData().forEach(topicData ->
+ describeShareGroupOffsetsResponseTopicList.add(new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic()
+ .setTopicId(topicData.topicId())
+
.setTopicName(requestTopicIdToNameMapping.get(topicData.topicId()))
+ .setPartitions(topicData.partitions().stream().map(
+ partitionData -> new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition()
+ .setPartitionIndex(partitionData.partition())
+ .setStartOffset(partitionData.startOffset())
+
.setErrorMessage(Errors.forCode(partitionData.errorCode()).message())
+ .setErrorCode(partitionData.errorCode())
+ ).toList())
+ ));
+
+ future.complete(
+ new
DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseGroup()
+ .setGroupId(requestData.groupId())
+
.setTopics(describeShareGroupOffsetsResponseTopicList));
});
Review Comment:
For my understanding: else where in the GroupCoorrdinatorService we schedule
read operation over runtime but we handle this current code differently, why?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]