lucasbru commented on code in PR #20600:
URL: https://github.com/apache/kafka/pull/20600#discussion_r2429046125
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4504,6 +4506,55 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ @Test
+ def testDescribeStreamsGroupsForStatelessTopology(): Unit = {
+ val streamsGroupId = "stream_group_id"
+ val testTopicName = "test_topic"
+ val testNumPartitions = 1
+
+ val config = createConfig
+ client = Admin.create(config)
+
+ prepareTopics(List(testTopicName), testNumPartitions)
+ prepareRecords(testTopicName)
+
+ val streams = createStreamsGroup(
+ inputTopics = Set(testTopicName),
+ streamsGroupId = streamsGroupId
+ )
+ streams.poll(JDuration.ofMillis(500L))
+
+ try {
+ TestUtils.waitUntilTrue(() => {
+ val firstGroup =
client.listGroups().all().get().stream().findFirst().orElse(null)
+ firstGroup.groupState().orElse(null) == GroupState.STABLE &&
firstGroup.groupId() == streamsGroupId
+ }, "Streams group did not transition to STABLE before timeout")
+
+ // Verify the describe call works correctly
+ val describedGroups =
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+ val group = describedGroups.get(streamsGroupId)
Review Comment:
This is not a test aimed at specifically testing the bug at all. Just
something that we hadn't implemented before, because both listGroup and
describeGroup didn't work correctly in this case. But otherwise, this is a
generic integration test for describe streams group, which is why it does not
end after the list groups part above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]