lucasbru commented on code in PR #20244:
URL: https://github.com/apache/kafka/pull/20244#discussion_r2306898719


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -4363,6 +4401,162 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }
     }
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly"))
+  def testDescribeStreamsGroups(groupProtocol: String): Unit = {
+    val streamsGroupId = "stream_group_id"
+    val testTopicName = "test_topic"
+    val testOutputTopicName = "test_output_topic"
+    val testNumPartitions = 1
+
+    val config = createConfig
+    client = Admin.create(config)
+
+    prepareTopics(List(testTopicName, testOutputTopicName), testNumPartitions)
+    prepareRecords(testTopicName)
+
+    val streamsConfig = new Properties()
+    streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000)
+    val streams = createStreamsGroup(
+      configOverrides = streamsConfig,
+      inputTopic = testTopicName,
+      outputTopic = testOutputTopicName,
+      streamsGroupId = streamsGroupId,
+      groupProtocol = groupProtocol
+    )
+
+    try {
+      streams.cleanUp()
+      streams.start()
+
+      TestUtils.waitUntilTrue(() => streams.state() == 
KafkaStreams.State.RUNNING, "Streams not in RUNNING state")
+
+      TestUtils.waitUntilTrue(() => {
+        val firstGroup = 
client.listGroups().all().get().stream().findFirst().orElse(null)
+        firstGroup.groupState().orElse(null) == GroupState.STABLE && 
firstGroup.groupId() == streamsGroupId
+      }, "Stream group not stable yet")
+
+      // Verify the describe call works correctly
+      val describedGroups = 
client.describeStreamsGroups(util.List.of(streamsGroupId)).all().get()
+      val group = describedGroups.get(streamsGroupId)
+      assertNotNull(group)
+      assertEquals(streamsGroupId, group.groupId())
+      assertFalse(group.members().isEmpty)
+      assertNotNull(group.subtopologies())
+      assertFalse(group.subtopologies().isEmpty)
+
+      // Verify the topology contains the expected source and sink topics
+      val subtopologies = group.subtopologies().asScala
+      assertTrue(subtopologies.exists(subtopology =>
+        subtopology.sourceTopics().contains(testTopicName)))
+
+      // Test describing a non-existing group
+      val nonExistingGroup = "non_existing_stream_group"
+      val describedNonExistingGroupResponse = 
client.describeStreamsGroups(util.List.of(nonExistingGroup))
+      assertFutureThrows(classOf[GroupIdNotFoundException], 
describedNonExistingGroupResponse.all())
+
+    } finally {
+      Utils.closeQuietly(streams, "streams")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  
@MethodSource(Array("getTestGroupProtocolParametersStreamsGroupProtocolOnly"))

Review Comment:
   Why parametrize the test if there is only one variation? Could we just 
remove the parameter?



##########
core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala:
##########
@@ -235,6 +241,41 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
     streamsConsumer
   }
 
+  def createStreamsGroup[K, V](configOverrides: Properties = new Properties,

Review Comment:
   I wonder if could create the streams groups without adding a dependency to 
the streams package - this seems to create an extra dependency that seems quite 
circular. It's not too bad since its a testDependency only.
   
   Can you check AuthorizerIntegrationTest.createStreamsGroupToDescribe and 
check how I created a streams groups with input topic / output topic and also a 
changelog topic? This should work here as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to