dajac commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1400669970
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3693,8 +3693,49 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { - requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val response = new ConsumerGroupDescribeResponseData() + + val authorizedGroups = new ArrayBuffer[String]() + consumerGroupDescribeRequest.data.groupIds.forEach { groupId => + if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { + response.groups.add(new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code) + .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message) Review Comment: We usually don't set the error message when it is basically the default error message so I would remove it here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -253,8 +254,8 @@ GroupMetadataManager build() { genericGroupMaxSessionTimeoutMs ); } - } + } Review Comment: Let's revert this one as well. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -133,6 +134,7 @@ public class GroupMetadataManager { public static class Builder { + Review Comment: Let's revert this change as it is unnecessary. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8660,6 +8665,105 @@ public void testListGroups() { assertEquals(expectAllGroupMap, actualAllGroupMap); } + @Test + public void testConsumerGroupDescribeNoErrors() { + String consumerGroupId = "consumerGroupId"; + int epoch = 10; + String memberId = Uuid.randomUuid().toString(); + String topicName = "topicName"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, epoch)) + .build(); + + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(Collections.singletonList(topicName)); + context.replay(RecordHelpers.newMemberSubscriptionRecord( + consumerGroupId, + memberBuilder.build() + )); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, epoch + 1)); + + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId)); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup(); + describedGroup.setGroupEpoch(epoch + 1); + describedGroup.setGroupId(consumerGroupId); + describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember(new Assignment(Collections.emptyMap())))); + describedGroup.setAssignorName(""); + describedGroup.setGroupState("assigning"); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList( + describedGroup + ); + + assertEquals(expected, actual); + } + + @Test + public void testConsumerGroupDescribeWithErrors() { + String groupId = "groupId"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + context.replay(newGroupMetadataRecord( + groupId, + new GroupMetadataValue() + .setMembers(Collections.emptyList()), + MetadataVersion.latest() + )); Review Comment: I suppose that we could remove this, no? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -445,6 +446,41 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe( + List<String> groupIds, + long committedOffset + ) { + List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new ArrayList<>(); + + for (String groupId: groupIds) { + Group group = groups.get(groupId, committedOffset); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId); + + if (group == null || !CONSUMER.equals(group.type())) { + // We don't support upgrading/downgrading between protocols at the moment so + // we set an error if a group exists with the wrong type. + describedGroup.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()); + } else { + ConsumerGroup consumerGroup = (ConsumerGroup) group; + describedGroup.setGroupState(consumerGroup.stateAsString()) + .setGroupEpoch(consumerGroup.groupEpoch()) + .setAssignmentEpoch(consumerGroup.assignmentEpoch()) + .setAssignorName(consumerGroup.preferredServerAssignor().isPresent() ? + consumerGroup.preferredServerAssignor().get() : ""); Review Comment: This should actually be `consumerGroup.preferredServerAssignor().orElse(defaultAssignor.name()`. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -525,6 +527,69 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return future; } + /** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe( + RequestContext context, + List<String> groupIds + ) { + if (!isActive.get()) { + return CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList( + groupIds, + Errors.COORDINATOR_NOT_AVAILABLE + )); + } + + final List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> futures = + new ArrayList<>(groupIds.size()); + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DescribeGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setErrorMessage(Errors.INVALID_GROUP_ID.message()) Review Comment: Let's remove the error message. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8660,6 +8665,105 @@ public void testListGroups() { assertEquals(expectAllGroupMap, actualAllGroupMap); } + @Test + public void testConsumerGroupDescribeNoErrors() { + String consumerGroupId = "consumerGroupId"; + int epoch = 10; + String memberId = Uuid.randomUuid().toString(); + String topicName = "topicName"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, epoch)) Review Comment: I think that you could leverage `ConsumerGroupBuilder` a bit more to add a few groups and/or members. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6465,12 +6465,45 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.addAll(groupIds) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + + createKafkaApis( + overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") + ).handle(requestChannelRequest, RequestLocal.NoCaching) + + val describedGroups = List( + new DescribedGroup().setGroupId(groupIds[0]), + new DescribedGroup().setGroupId(groupIds[1]), + new DescribedGroup().setGroupId(groupIds[2]) + ).asJava + + future.complete(describedGroups) + val consumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() Review Comment: nit: `expectedConsumerGroupDescribeResponse`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -545,6 +547,32 @@ public String currentAssignmentSummary() { ')'; } + public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(Assignment targetAssignment) { Review Comment: Instead of passing the `targetAssignment` here, I wonder if we could just set it on the caller side. Have you considered this? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8660,6 +8665,105 @@ public void testListGroups() { assertEquals(expectAllGroupMap, actualAllGroupMap); } + @Test + public void testConsumerGroupDescribeNoErrors() { + String consumerGroupId = "consumerGroupId"; + int epoch = 10; + String memberId = Uuid.randomUuid().toString(); + String topicName = "topicName"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, epoch)) + .build(); + + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(Collections.singletonList(topicName)); + context.replay(RecordHelpers.newMemberSubscriptionRecord( + consumerGroupId, + memberBuilder.build() + )); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, epoch + 1)); + Review Comment: This test is actually not supposed to work because you don't commit after replaying the records so the state should not be accessible. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8660,6 +8665,105 @@ public void testListGroups() { assertEquals(expectAllGroupMap, actualAllGroupMap); } + @Test + public void testConsumerGroupDescribeNoErrors() { + String consumerGroupId = "consumerGroupId"; + int epoch = 10; + String memberId = Uuid.randomUuid().toString(); + String topicName = "topicName"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, epoch)) + .build(); + + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(Collections.singletonList(topicName)); + context.replay(RecordHelpers.newMemberSubscriptionRecord( + consumerGroupId, + memberBuilder.build() + )); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, epoch + 1)); + + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId)); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup(); + describedGroup.setGroupEpoch(epoch + 1); + describedGroup.setGroupId(consumerGroupId); + describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember(new Assignment(Collections.emptyMap())))); + describedGroup.setAssignorName(""); + describedGroup.setGroupState("assigning"); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList( + describedGroup + ); + + assertEquals(expected, actual); + } + + @Test + public void testConsumerGroupDescribeWithErrors() { + String groupId = "groupId"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + context.replay(newGroupMetadataRecord( + groupId, + new GroupMetadataValue() + .setMembers(Collections.emptyList()), + MetadataVersion.latest() + )); + + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Collections.singletonList(groupId)); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup(); + describedGroup.setGroupId(groupId); + describedGroup.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()); Review Comment: nit: We usually format the code as follow when we construct such object. ``` ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() .setGroupid() .set....; ``` It would be great if you could update all occurrences in the new tests. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9603,6 +9707,24 @@ private static JoinGroupRequestProtocolCollection toProtocols(String... protocol return protocols; } + private static Record newConsumerGroupMetadataRecord( Review Comment: There is already everything you need in `RecordHelpers`. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8660,6 +8665,105 @@ public void testListGroups() { assertEquals(expectAllGroupMap, actualAllGroupMap); } + @Test + public void testConsumerGroupDescribeNoErrors() { + String consumerGroupId = "consumerGroupId"; + int epoch = 10; + String memberId = Uuid.randomUuid().toString(); + String topicName = "topicName"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, epoch)) + .build(); + + ConsumerGroupMember.Builder memberBuilder = new ConsumerGroupMember.Builder(memberId) + .setSubscribedTopicNames(Collections.singletonList(topicName)); + context.replay(RecordHelpers.newMemberSubscriptionRecord( + consumerGroupId, + memberBuilder.build() + )); + context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, epoch + 1)); + + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId)); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup(); + describedGroup.setGroupEpoch(epoch + 1); + describedGroup.setGroupId(consumerGroupId); + describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember(new Assignment(Collections.emptyMap())))); + describedGroup.setAssignorName(""); + describedGroup.setGroupState("assigning"); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList( + describedGroup + ); + + assertEquals(expected, actual); + } + + @Test + public void testConsumerGroupDescribeWithErrors() { + String groupId = "groupId"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + context.replay(newGroupMetadataRecord( + groupId, + new GroupMetadataValue() + .setMembers(Collections.emptyList()), + MetadataVersion.latest() + )); + + List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = context.sendConsumerGroupDescribe(Collections.singletonList(groupId)); + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup(); + describedGroup.setGroupId(groupId); + describedGroup.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()); + List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = Collections.singletonList( + describedGroup + ); + + assertEquals(expected, actual); + } + + @Test + public void testConsumerGroupDescribeBeforeAndAfterCommittingOffset() { + String consumerGroupId = "consumerGroupId"; + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .build(); + + // Add group without committing + context.replay(newConsumerGroupMetadataRecord( + consumerGroupId, + new ConsumerGroupMetadataValue(), + MetadataVersion.latest() + )); Review Comment: I wonder if we could build a bigger test case with a few members. It would also be good to have target assignment records and current assignment records for completeness. ########## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ########## @@ -6206,12 +6206,40 @@ class KafkaApisTest { assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, response.data.errorCode) } + @Test + def testConsumerGroupDescribe(): Unit = { + val groupId = "group0" + val consumerGroupDescribeRequestData = new ConsumerGroupDescribeRequestData() + consumerGroupDescribeRequestData.groupIds.add(groupId) + val requestChannelRequest = buildRequest(new ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, true).build()) + + val future = new CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]() + when(groupCoordinator.consumerGroupDescribe( + any[RequestContext], + any[util.List[String]] + )).thenReturn(future) + + createKafkaApis( + overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> "true") + ).handle(requestChannelRequest, RequestLocal.NoCaching) + + val response = verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest) + + val describedGroups = List(new DescribedGroup()).asJava + val consumerGroupDescribeResponseData = new ConsumerGroupDescribeResponseData() + .setGroups(describedGroups) + future.complete(describedGroups) + + assertEquals(consumerGroupDescribeResponseData, response.data) + } + Review Comment: It would be great if we could add those. We could also do it as a follow-up if you like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org