lucasbru commented on code in PR #18979:
URL: https://github.com/apache/kafka/pull/18979#discussion_r1966098527
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1675,6 +1675,12 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE),
config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)
+
+ // This is OK.
Review Comment:
You are right, can be removed. I was just following the style in rest of the
test.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -609,6 +612,34 @@ public List<ShareGroupDescribeResponseData.DescribedGroup>
shareGroupDescribe(
return describedGroups;
}
+ /**
+ * Handles a StreamsGroupDescribe request.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A list containing the
StreamsGroupDescribeResponseData.DescribedGroup.
Review Comment:
Good idea. Done
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -682,6 +725,58 @@ public
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
return FutureUtils.combineFutures(futures, ArrayList::new,
List::addAll);
}
+ /**
+ * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
+ */
+ @Override
+ public
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>
streamsGroupDescribe(
+ RequestContext context,
+ List<String> groupIds
+ ) {
+ if (!isActive.get()) {
+ return
CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
+ }
+
+ final
List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>>
futures =
+ new ArrayList<>(groupIds.size());
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
Review Comment:
The group coordinator is sharded by topic partition of the consumer offset
topic. So we group the group IDs by the topic partitions of the consumer
offset, which acts as an "address" of the right group coordinator. We fetch the
described groups by instance and merge the results.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8944,6 +8947,113 @@ public void
testConsumerGroupDescribeBeforeAndAfterCommittingOffset() {
assertEquals(expected, actual);
}
+ @Test
+ public void testStreamsGroupDescribeNoErrors() {
+ List<String> streamsGroupIds = Arrays.asList("group-id-1",
"group-id-2");
+ int epoch = 10;
+ String memberId = "member-id";
+ StreamsGroupMember.Builder memberBuilder =
streamsGroupMemberBuilderWithDefaults(memberId)
+ .setClientTags(Collections.singletonMap("clientTag",
"clientValue"))
+ .setProcessId("processId")
+ .setMemberEpoch(epoch)
+ .setPreviousMemberEpoch(epoch - 1);
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0),
epoch))
+ .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1),
epoch)
+ .withMember(memberBuilder.build()))
+ .build();
+
+ List<StreamsGroupDescribeResponseData.DescribedGroup> expected =
Arrays.asList(
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupEpoch(epoch)
+ .setGroupId(streamsGroupIds.get(0))
+ .setGroupState(StreamsGroupState.EMPTY.toString())
+ .setAssignmentEpoch(0),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupEpoch(epoch)
+ .setGroupId(streamsGroupIds.get(1))
+ .setMembers(Collections.singletonList(
+ memberBuilder.build().asStreamsGroupDescribeMember(
+ TasksTuple.EMPTY
+ )
+ ))
+ .setGroupState(StreamsGroupState.NOT_READY.toString())
+ );
+ List<StreamsGroupDescribeResponseData.DescribedGroup> actual =
context.sendStreamsGroupDescribe(streamsGroupIds);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testStreamsGroupDescribeWithErrors() {
+ String groupId = "groupId";
+
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]