Copilot commented on code in PR #20244: URL: https://github.com/apache/kafka/pull/20244#discussion_r2316974230
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2589,13 +2592,25 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId) val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + val streamsGroup = createStreamsGroup( + configOverrides = streamsConfig , + inputTopic = testTopicName , + outputTopic = testStreamsOutputTopicName , Review Comment: Remove trailing spaces after commas on lines 2599-2601 to maintain consistent code formatting. ```suggestion configOverrides = streamsConfig, inputTopic = testTopicName, outputTopic = testStreamsOutputTopicName, ``` ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2589,13 +2592,25 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId) val shareGroup = createShareConsumer(configOverrides = shareGroupConfig) + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + val streamsGroup = createStreamsGroup( + configOverrides = streamsConfig , + inputTopic = testTopicName , + outputTopic = testStreamsOutputTopicName , + streamsGroupId = streamsGroupId + ) + val config = createConfig client = Admin.create(config) try { - client.createTopics(util.Set.of( - new NewTopic(testTopicName, 1, 1.toShort) - )).all().get() - waitForTopics(client, List(testTopicName), List()) + // client.createTopics(util.Set.of( + // new NewTopic(testTopicName, 1, 1.toShort), + // new NewTopic(testStreamsOutputTopicName, 1, 1.toShort) + // )).all().get() + // + // waitForTopics(client, List(testTopicName), List()) Review Comment: Commented-out code should be removed rather than left in the codebase. If this code is needed for debugging, consider using a conditional flag or removing it entirely. ```suggestion ``` ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -4363,17 +4396,157 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { } } } + + @Test + def testDescribeStreamsGroups(): Unit = { + val streamsGroupId = "stream_group_id" + val testTopicName = "test_topic" + val testOutputTopicName = "test_output_topic" + val testNumPartitions = 1 + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + val streams = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testOutputTopicName, + streamsGroupId = streamsGroupId + ) + streams.poll(JDuration.ofMillis(500L)) + + try { + TestUtils.waitUntilTrue(() => { + val firstGroup = client.listGroups().all().get().stream().findFirst().orElse(null) + firstGroup.groupState().orElse(null) == GroupState.STABLE && firstGroup.groupId() == streamsGroupId + }, "Stream group not stable yet") + + // Verify the describe call works correctly + val describedGroups = client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get() + val group = describedGroups.get(streamsGroupId) + assertNotNull(group) + assertEquals(streamsGroupId, group.groupId()) + assertFalse(group.members().isEmpty) + assertNotNull(group.subtopologies()) + assertFalse(group.subtopologies().isEmpty) + + // Verify the topology contains the expected source and sink topics + val subtopologies = group.subtopologies().asScala + assertTrue(subtopologies.exists(subtopology => + subtopology.sourceTopics().contains(testTopicName))) + + // Test describing a non-existing group + val nonExistingGroup = "non_existing_stream_group" + val describedNonExistingGroupResponse = client.describeStreamsGroups(util.List.of(nonExistingGroup)) + assertFutureThrows(classOf[GroupIdNotFoundException], describedNonExistingGroupResponse.all()) + + } finally { + Utils.closeQuietly(streams, "streams") + Utils.closeQuietly(client, "adminClient") + } + } + + @Test + def testDeleteStreamsGroups(): Unit = { + val testTopicName = "test_topic" + val testOutputTopicName = "test_output_topic" + val testNumPartitions = 3 + val testNumStreamsGroup = 3 + + val targetDeletedGroups = util.List.of("stream_group_id_2", "stream_group_id_3") + val targetRemainingGroups = util.List.of("stream_group_id_1") + + val config = createConfig + client = Admin.create(config) + + prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions) + prepareRecords(testTopicName) + + val streamsList = scala.collection.mutable.ListBuffer[(String, AsyncKafkaConsumer[_,_])]() + + try { + for (i <- 1 to testNumStreamsGroup) { + val streamsGroupId = s"stream_group_id_$i" + val streamsConfig = new Properties() + streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + val streams = createStreamsGroup( + configOverrides = streamsConfig, + inputTopic = testTopicName, + outputTopic = testOutputTopicName, + streamsGroupId = streamsGroupId, + ) + streams.poll(JDuration.ofMillis(500L)) + streamsList += ((streamsGroupId, streams)) + } + + TestUtils.waitUntilTrue(() => { + val groups = client.listGroups().all().get() + groups.stream() + .anyMatch(g => g.groupId().startsWith("stream_group_id_")) && testNumStreamsGroup == groups.size() + }, "Streams groups not ready to delete yet") + + // Test deletion of non-empty existing groups + var deleteStreamsGroupResult = client.deleteStreamsGroups(targetDeletedGroups) + assertFutureThrows(classOf[GroupNotEmptyException], deleteStreamsGroupResult.all()) + assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2) Review Comment: Missing space after comma in method call. Should be `assertEquals(deleteStreamsGroupResult.deletedGroups().size(), 2)` for consistent formatting. ```suggestion assertEquals(deleteStreamsGroupResult.deletedGroups().size(), 2) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org