bbejeck commented on code in PR #18979:
URL: https://github.com/apache/kafka/pull/18979#discussion_r1965809216


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -682,6 +725,58 @@ public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>
         return FutureUtils.combineFutures(futures, ArrayList::new, 
List::addAll);
     }
 
+    /**
+     * See {@link GroupCoordinator#streamsGroupDescribe(RequestContext, List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>> 
streamsGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(StreamsGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<StreamsGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();

Review Comment:
   more of a general question - why track groups by `TopicPartition`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -609,6 +612,34 @@ public List<ShareGroupDescribeResponseData.DescribedGroup> 
shareGroupDescribe(
         return describedGroups;
     }
 
+    /**
+     * Handles a StreamsGroupDescribe request.
+     *
+     * @param groupIds          The IDs of the groups to describe.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the 
StreamsGroupDescribeResponseData.DescribedGroup.

Review Comment:
   nit: maybe update the javadoc comment that returned list will also contain 
any errors describing the group or something to that effect, but I'm not sure 
if this is valid request or not.



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1675,6 +1675,12 @@ class KafkaConfigTest {
     assertEquals(Set(GroupType.CLASSIC, GroupType.CONSUMER, GroupType.SHARE), 
config.groupCoordinatorRebalanceProtocols)
     assertTrue(config.isNewGroupCoordinatorEnabled)
     assertTrue(config.shareGroupConfig.isShareGroupEnabled)
+
+    // This is OK.

Review Comment:
   What's this for - can we remove it?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8944,6 +8947,113 @@ public void 
testConsumerGroupDescribeBeforeAndAfterCommittingOffset() {
         assertEquals(expected, actual);
     }
 
+    @Test
+    public void testStreamsGroupDescribeNoErrors() {
+        List<String> streamsGroupIds = Arrays.asList("group-id-1", 
"group-id-2");
+        int epoch = 10;
+        String memberId = "member-id";
+        StreamsGroupMember.Builder memberBuilder = 
streamsGroupMemberBuilderWithDefaults(memberId)
+            .setClientTags(Collections.singletonMap("clientTag", 
"clientValue"))
+            .setProcessId("processId")
+            .setMemberEpoch(epoch)
+            .setPreviousMemberEpoch(epoch - 1);
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(0), 
epoch))
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupIds.get(1), 
epoch)
+                .withMember(memberBuilder.build()))
+            .build();
+
+        List<StreamsGroupDescribeResponseData.DescribedGroup> expected = 
Arrays.asList(
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(epoch)
+                .setGroupId(streamsGroupIds.get(0))
+                .setGroupState(StreamsGroupState.EMPTY.toString())
+                .setAssignmentEpoch(0),
+            new StreamsGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(epoch)
+                .setGroupId(streamsGroupIds.get(1))
+                .setMembers(Collections.singletonList(
+                    memberBuilder.build().asStreamsGroupDescribeMember(
+                        TasksTuple.EMPTY
+                    )
+                ))
+                .setGroupState(StreamsGroupState.NOT_READY.toString())
+        );
+        List<StreamsGroupDescribeResponseData.DescribedGroup> actual = 
context.sendStreamsGroupDescribe(streamsGroupIds);
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testStreamsGroupDescribeWithErrors() {
+        String groupId = "groupId";
+

Review Comment:
   nit: don't need a blank line here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to