Copilot commented on code in PR #20244:
URL: https://github.com/apache/kafka/pull/20244#discussion_r2316974230


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2589,13 +2592,25 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
     val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
 
+    val streamsConfig = new Properties()
+    streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+    val streamsGroup = createStreamsGroup(
+      configOverrides = streamsConfig ,
+      inputTopic = testTopicName ,
+      outputTopic = testStreamsOutputTopicName ,

Review Comment:
   Remove trailing spaces after commas on lines 2599-2601 to maintain 
consistent code formatting.
   ```suggestion
         configOverrides = streamsConfig,
         inputTopic = testTopicName,
         outputTopic = testStreamsOutputTopicName,
   ```



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2589,13 +2592,25 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     shareGroupConfig.put(ConsumerConfig.GROUP_ID_CONFIG, shareGroupId)
     val shareGroup = createShareConsumer(configOverrides = shareGroupConfig)
 
+    val streamsConfig = new Properties()
+    streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+    val streamsGroup = createStreamsGroup(
+      configOverrides = streamsConfig ,
+      inputTopic = testTopicName ,
+      outputTopic = testStreamsOutputTopicName ,
+      streamsGroupId = streamsGroupId
+    )
+
     val config = createConfig
     client = Admin.create(config)
     try {
-      client.createTopics(util.Set.of(
-        new NewTopic(testTopicName, 1, 1.toShort)
-      )).all().get()
-      waitForTopics(client, List(testTopicName), List())
+      //      client.createTopics(util.Set.of(
+      //        new NewTopic(testTopicName, 1, 1.toShort),
+      //        new NewTopic(testStreamsOutputTopicName, 1, 1.toShort)
+      //      )).all().get()
+      //
+      //      waitForTopics(client, List(testTopicName), List())

Review Comment:
   Commented-out code should be removed rather than left in the codebase. If 
this code is needed for debugging, consider using a conditional flag or 
removing it entirely.
   ```suggestion
   
   ```



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4363,17 +4396,157 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }
     }
   }
+
+  @Test
+  def testDescribeStreamsGroups(): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testOutputTopicName = "test_output_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streamsConfig = new Properties()
+    streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    val streams = createStreamsGroup(
+      configOverrides = streamsConfig,
+      inputTopic = testTopicName,
+      outputTopic = testOutputTopicName,
+      streamsGroupId = streamsGroupId
+    )
+    streams.poll(JDuration.ofMillis(500L))
+
+    try {
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+      }, "Stream group not stable yet")
+
+      // Verify the describe call works correctly
+      val describedGroups = 
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+      val group = describedGroups.get(streamsGroupId)
+      assertNotNull(group)
+      assertEquals(streamsGroupId, group.groupId())
+      assertFalse(group.members().isEmpty)
+      assertNotNull(group.subtopologies())
+      assertFalse(group.subtopologies().isEmpty)
+
+      // Verify the topology contains the expected source and sink topics
+      val subtopologies = group.subtopologies().asScala
+      assertTrue(subtopologies.exists(subtopology =>
+        subtopology.sourceTopics().contains(testTopicName)))
+
+      // Test describing a non-existing group
+      val nonExistingGroup = "non_existing_stream_group"
+      val describedNonExistingGroupResponse = 
client.describeStreamsGroups(util.List.of(nonExistingGroup))
+      assertFutureThrows(classOf[GroupIdNotFoundException], 
describedNonExistingGroupResponse.all())
+
+    } finally {
+      Utils.closeQuietly(streams, "streams")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @Test
+  def testDeleteStreamsGroups(): Unit = {
+    val testTopicName = "test_topic"
+    val testOutputTopicName = "test_output_topic"
+    val testNumPartitions = 3
+    val testNumStreamsGroup = 3
+
+    val targetDeletedGroups = util.List.of("stream_group_id_2", 
"stream_group_id_3")
+    val targetRemainingGroups = util.List.of("stream_group_id_1")
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streamsList = scala.collection.mutable.ListBuffer[(String, 
AsyncKafkaConsumer[_,_])]()
+
+    try {
+      for (i <- 1 to testNumStreamsGroup) {
+        val streamsGroupId = s"stream_group_id_$i"
+        val streamsConfig = new Properties()
+        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+        val streams = createStreamsGroup(
+          configOverrides = streamsConfig,
+          inputTopic = testTopicName,
+          outputTopic = testOutputTopicName,
+          streamsGroupId = streamsGroupId,
+        )
+        streams.poll(JDuration.ofMillis(500L))
+        streamsList += ((streamsGroupId, streams))
+      }
+
+      TestUtils.waitUntilTrue(() => {
+        val groups = client.listGroups().all().get()
+        groups.stream()
+          .anyMatch(g => g.groupId().startsWith("stream_group_id_")) &&  
testNumStreamsGroup == groups.size()
+      }, "Streams groups not ready to delete yet")
+
+      // Test deletion of non-empty existing groups
+      var deleteStreamsGroupResult = 
client.deleteStreamsGroups(targetDeletedGroups)
+      assertFutureThrows(classOf[GroupNotEmptyException], 
deleteStreamsGroupResult.all())
+      assertEquals(deleteStreamsGroupResult.deletedGroups().size(),2)

Review Comment:
   Missing space after comma in method call. Should be 
`assertEquals(deleteStreamsGroupResult.deletedGroups().size(), 2)` for 
consistent formatting.
   ```suggestion
         assertEquals(deleteStreamsGroupResult.deletedGroups().size(), 2)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to