bbejeck commented on code in PR #18979:
URL: https://github.com/apache/kafka/pull/18979#discussion_r1965809216
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -682,6 +725,58 @@ public
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
return FutureUtils.combineFutures(futures, ArrayList::new,
List::addAll);
}
+ /**
+ * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
+ */
+ @Override
+ public
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>
streamsGroupDescribe(
+ RequestContext context,
+ List<String> groupIds
+ ) {
+ if (!isActive.get()) {
+ return
CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
+ }
+
+ final
List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>>
futures =
+ new ArrayList<>(groupIds.size());
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
Review Comment:
more of a general question - why track groups by `TopicPartition`?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -609,6 +612,34 @@ public List<ShareGroupDescribeResponseData.DescribedGroup>
shareGroupDescribe(
return describedGroups;
}
+ /**
+ * Handles a StreamsGroupDescribe request.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A list containing the
StreamsGroupDescribeResponseData.DescribedGroup.
Review Comment:
nit: maybe update the javadoc comment that returned list will also contain
any errors describing the group or something to that effect, but I'm not sure
if this is valid request or not.
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1675,6 +1675,12 @@ class KafkaConfigTest {
assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE),
config.groupCoordinatorRebalanceProtocols)
assertTrue(config.isNewGroupCoordinatorEnabled)
assertTrue(config.shareGroupConfig.isShareGroupEnabled)
+
+ // This is OK.
Review Comment:
What's this for - can we remove it?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8944,6 +8947,113 @@ public void
testConsumerGroupDescribeBeforeAndAfterCommittingOffset() {
assertEquals(expected, actual);
}
+ @Test
+ public void testStreamsGroupDescribeNoErrors() {
+ List<String> streamsGroupIds = Arrays.asList("group-id-1",
"group-id-2");
+ int epoch = 10;
+ String memberId = "member-id";
+ StreamsGroupMember.Builder memberBuilder =
streamsGroupMemberBuilderWithDefaults(memberId)
+ .setClientTags(Collections.singletonMap("clientTag",
"clientValue"))
+ .setProcessId("processId")
+ .setMemberEpoch(epoch)
+ .setPreviousMemberEpoch(epoch - 1);
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0),
epoch))
+ .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1),
epoch)
+ .withMember(memberBuilder.build()))
+ .build();
+
+ List<StreamsGroupDescribeResponseData.DescribedGroup> expected =
Arrays.asList(
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupEpoch(epoch)
+ .setGroupId(streamsGroupIds.get(0))
+ .setGroupState(StreamsGroupState.EMPTY.toString())
+ .setAssignmentEpoch(0),
+ new StreamsGroupDescribeResponseData.DescribedGroup()
+ .setGroupEpoch(epoch)
+ .setGroupId(streamsGroupIds.get(1))
+ .setMembers(Collections.singletonList(
+ memberBuilder.build().asStreamsGroupDescribeMember(
+ TasksTuple.EMPTY
+ )
+ ))
+ .setGroupState(StreamsGroupState.NOT_READY.toString())
+ );
+ List<StreamsGroupDescribeResponseData.DescribedGroup> actual =
context.sendStreamsGroupDescribe(streamsGroupIds);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testStreamsGroupDescribeWithErrors() {
+ String groupId = "groupId";
+
Review Comment:
nit: don't need a blank line here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]