lucasbru commented on code in PR #20244: URL: https://github.com/apache/kafka/pull/20244#discussion_r2306898719
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -4363,6 +4401,162 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) + def testDescribeStreamsGroups(groupProtocol: String): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testOutputTopicName = "test_output_topic" + val testNumPartitions = 1 + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000) + val streams = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testOutputTopicName, + streamsGroupId = streamsGroupId, + groupProtocol = groupProtocol + ) + + try { + streams.cleanUp() + streams.start() + + TestUtils.waitUntilTrue(() => streams.state() == KafkaStreams.State.RUNNING, "Streams not in RUNNING state") + + TestUtils.waitUntilTrue(() => { + val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) + firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId + }, "Stream group not stable yet") + + // Verify the describe call works correctly + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + assertNotNull(group) + assertEquals(streamsGroupId, group.groupId()) + assertFalse(group.members().isEmpty) + assertNotNull(group.subtopologies()) + assertFalse(group.subtopologies().isEmpty) + + // Verify the topology contains the expected source and sink topics + val subtopologies = group.subtopologies().asScala + assertTrue(subtopologies.exists(subtopology => + subtopology.sourceTopics().contains(testTopicName))) + + // Test describing a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all()) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + } + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly")) Review Comment: Why parametrize the test if there is only one variation? Could we just remove the parameter? ########## core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala: ########## @@ -235,6 +241,41 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { streamsConsumer } + def createStreamsGroup[K, V](configOverrides: Properties = new Properties, Review Comment: I wonder if could create the streams groups without adding a dependency to the streams package - this seems to create an extra dependency that seems quite circular. It's not too bad since its a testDependency only. Can you check AuthorizerIntegrationTest.createStreamsGroupToDescribe and check how I created a streams groups with input topic / output topic and also a changelog topic? This should work here as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org