dajac commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1411799674
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -547,6 +549,69 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return future;
}
+ /**
+ * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext,
List)}.
+ */
+ @Override
+ public
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
consumerGroupDescribe(
+ RequestContext context,
+ List<String> groupIds
+ ) {
+ if (!isActive.get()) {
+ return
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
+ }
+
+ final
List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>>
futures =
+ new ArrayList<>(groupIds.size());
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DescribeGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+ new ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ .setErrorMessage(Errors.INVALID_GROUP_ID.message())
+ )));
+ } else {
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartitionFor(groupId), __ -> new
ArrayList<>())
+ .add(groupId);
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
future =
+ runtime.scheduleReadOperation(
+ "consumer-group-describe",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
coordinator.consumerGroupDescribe(groupIds, lastCommittedOffset)
+ ).exceptionally(exception -> {
+ if (!(exception instanceof KafkaException)) {
+ log.error("ConsumerGroupDescribe request {} hit an
unexpected exception: {}.",
+ groupList, exception.getMessage());
+ }
+
+ return
ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+ groupList,
+ Errors.forException(exception)
+ );
+ });
+
+ futures.add(future);
+ });
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ return allFutures.thenApply(v -> {
+ final List<ConsumerGroupDescribeResponseData.DescribedGroup> res =
new ArrayList<>();
+ futures.forEach(future -> res.addAll(future.join()));
+ return res;
+ });
Review Comment:
I think that we have more or less the same code elsewhere in this class. If
you are interested, we could try to refactor this into an helper method as a
follow-up. We can keep it as-is in this pull request though.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -468,6 +468,33 @@ public List<ListGroupsResponseData.ListedGroup>
listGroups(List<String> statesFi
return groupStream.map(group ->
group.asListedGroup(committedOffset)).collect(Collectors.toList());
}
+
+ /**
+ * Handles a ConsumerGroupDescribe request.
+ * @param groupIds The IDs of the groups to describe.
Review Comment:
nit: We usually put an empty line between the description and the params.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -547,6 +549,69 @@ public CompletableFuture<ListGroupsResponseData>
listGroups(
return future;
}
+ /**
+ * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext,
List)}.
+ */
+ @Override
+ public
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
consumerGroupDescribe(
+ RequestContext context,
+ List<String> groupIds
+ ) {
+ if (!isActive.get()) {
+ return
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
+ }
+
+ final
List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>>
futures =
+ new ArrayList<>(groupIds.size());
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DescribeGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+ new ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ .setErrorMessage(Errors.INVALID_GROUP_ID.message())
Review Comment:
Let's remove the default error message here.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,34 @@ public String currentAssignmentSummary() {
')';
}
+ public ConsumerGroupDescribeResponseData.Member
asConsumerGroupDescribeMember(Assignment targetAssignment) {
Review Comment:
nit: Javadoc.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##########
@@ -304,4 +312,75 @@ public void
testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)),
member.partitionsPendingRevocation());
assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)),
member.partitionsPendingAssignment());
}
+
+ @Test
+ public void testAsConsumerGroupDescribeMember() {
+ Uuid topicId1 = Uuid.randomUuid();
+ Uuid topicId2 = Uuid.randomUuid();
+ Uuid topicId3 = Uuid.randomUuid();
+ List<Integer> assignedPartitions = Arrays.asList(0, 1, 2);
+ ConsumerGroupCurrentMemberAssignmentValue record = new
ConsumerGroupCurrentMemberAssignmentValue()
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setTargetMemberEpoch(11)
+ .setAssignedPartitions(Collections.singletonList(new
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(topicId1)
+ .setPartitions(assignedPartitions)))
+ .setPartitionsPendingRevocation(Collections.singletonList(new
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(topicId2)
+ .setPartitions(Arrays.asList(3, 4, 5))))
+ .setPartitionsPendingAssignment(Collections.singletonList(new
ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
+ .setTopicId(topicId3)
+ .setPartitions(Arrays.asList(6, 7, 8))));
+ String memberId = Uuid.randomUuid().toString();
+ String clientId = "clientId";
+ String instanceId = "instanceId";
+ String rackId = "rackId";
+ String clientHost = "clientHost";
+ List<String> subscribedTopicNames = Arrays.asList("topic1", "topic2");
+ String subscribedTopicRegex = "topic.*";
+ Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
+ assignmentMap.put(Uuid.randomUuid(), new HashSet<>());
+ Assignment targetAssignment = new Assignment(assignmentMap);
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .updateWith(record)
+ .setClientId(clientId)
+ .setInstanceId(instanceId)
+ .setRackId(rackId)
+ .setClientHost(clientHost)
+ .setSubscribedTopicNames(subscribedTopicNames)
+ .setSubscribedTopicRegex(subscribedTopicRegex)
+ .build();
+
+ ConsumerGroupDescribeResponseData.Member consumerGroupDescribeMember =
member.asConsumerGroupDescribeMember(targetAssignment);
+
+ assertEquals(memberId,
consumerGroupDescribeMember.memberId().toString());
+ assertEquals(clientId, consumerGroupDescribeMember.clientId());
+ assertEquals(instanceId, consumerGroupDescribeMember.instanceId());
+ assertEquals(rackId, consumerGroupDescribeMember.rackId());
+ assertEquals(clientHost, consumerGroupDescribeMember.clientHost());
+ assertEquals(subscribedTopicNames,
consumerGroupDescribeMember.subscribedTopicNames());
+ assertEquals(subscribedTopicRegex,
consumerGroupDescribeMember.subscribedTopicRegex());
+ assertEquals(
+ new ConsumerGroupDescribeResponseData.Assignment()
+
.setTopicPartitions(targetAssignment.partitions().entrySet().stream().map(
+ item -> new
ConsumerGroupDescribeResponseData.TopicPartitions()
+ .setTopicId(item.getKey())
+ .setPartitions(new ArrayList<>(item.getValue()))
+ ).collect(Collectors.toList())),
+ consumerGroupDescribeMember.targetAssignment()
+ );
+
+ assertEquals(assignedPartitions,
consumerGroupDescribeMember.assignment().topicPartitions().get(0).partitions());
Review Comment:
nit: We usually prefer to construct the full expected data structure, here
`ConsumerGroupDescribeResponseData.Member`, and to use `assertEquals` because
it ensures that the all the fields are tested (also future fields).
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3743,8 +3743,48 @@ class KafkaApis(val requestChannel: RequestChannel,
}
def handleConsumerGroupDescribe(request: RequestChannel.Request):
CompletableFuture[Unit] = {
- requestHelper.sendMaybeThrottle(request,
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
- CompletableFuture.completedFuture[Unit](())
+ val consumerGroupDescribeRequest =
request.body[ConsumerGroupDescribeRequest]
Review Comment:
I just realized that we don't handle the `IncludeAuthorizedOperations` case
in the request. When set, we must populate the `AuthorizedOperations` field in
the response. You can check how we did this in `handleDescribeGroupsRequest`.
I suggest to do this in a separate pull request though.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -623,6 +650,22 @@ public GenericGroup genericGroup(
}
}
+ public ConsumerGroup consumerGroup(
Review Comment:
nit: Could we add the javadoc?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]