OmniaGM commented on code in PR #16920:
URL: https://github.com/apache/kafka/pull/16920#discussion_r1725304682


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1948,6 +1948,161 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft+kip932"))
+  def testShareGroups(quorum: String): Unit = {
+    val testGroupId = "test_group_id"
+    val testClientId = "test_client_id"
+    val fakeGroupId = "fake_group_id"
+    val testTopicName = "test_topic"
+    val testNumPartitions = 2
+
+    def createProperties(): Properties = {
+      val newConsumerConfig = new Properties(consumerConfig)
+      newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+      newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+      newConsumerConfig
+    }
+
+    val consumerSet = Set(createShareConsumer(configOverrides = 
createProperties()))
+    val topicSet = Set(testTopicName)
+
+    val latch = new CountDownLatch(consumerSet.size)
+
+    def createShareConsumerThread[K,V](consumer: ShareConsumer[K,V], topic: 
String): Thread = {
+      new Thread {
+        override def run : Unit = {
+          consumer.subscribe(Collections.singleton(topic))
+          try {
+            while (true) {
+              consumer.poll(JDuration.ofSeconds(5))
+              if (latch.getCount > 0L)
+                latch.countDown()
+              consumer.commitSync()
+            }
+          } catch {
+            case _: InterruptException => // Suppress the output to stderr
+          }
+        }
+      }
+    }
+
+    val config = createConfig
+    client = Admin.create(config)
+    val producer = createProducer()
+    try {
+      // Verify that initially there are no share groups to list.
+      val list1 = client.listShareGroups()
+      assertEquals(0, list1.all().get().size())
+      assertEquals(0, list1.errors().get().size())
+      assertEquals(0, list1.valid().get().size())
+
+      client.createTopics(Collections.singleton(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort)
+      )).all().get()
+      waitForTopics(client, List(testTopicName), List())
+
+      producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+
+      try {
+        // Start consumers in a thread that will subscribe to a new group.
+        val consumerThreads = consumerSet.zip(topicSet).map(zipped => 
createShareConsumerThread(zipped._1, zipped._2))
+
+        try {
+          consumerThreads.foreach(_.start())
+          assertTrue(latch.await(30000, TimeUnit.MILLISECONDS))
+
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            client.listShareGroups.all.get.stream().filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE).count() == 1
+          }, s"Expected to be able to list $testGroupId")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.STABLE))
+            client.listShareGroups(options).all.get.stream().filter(group =>
+              group.groupId == testGroupId &&
+                group.state.get == ShareGroupState.STABLE).count() == 1
+          }, s"Expected to be able to list $testGroupId in state Stable")
+
+          TestUtils.waitUntilTrue(() => {
+            val options = new 
ListShareGroupsOptions().inStates(Collections.singleton(ShareGroupState.EMPTY))
+            client.listShareGroups(options).all.get.stream().filter(_.groupId 
== testGroupId).count() == 0
+          }, s"Expected to find zero groups")
+
+          val describeWithFakeGroupResult = 
client.describeShareGroups(util.Arrays.asList(testGroupId, fakeGroupId),
+            new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+
+          // Test that we can get information about the test share group.
+          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
+          assertEquals(2, describeWithFakeGroupResult.describedGroups().size())
+          var testGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+
+          assertEquals(testGroupId, testGroupDescription.groupId())
+          assertEquals(consumerSet.size, testGroupDescription.members().size())
+          val members = testGroupDescription.members()
+          members.forEach(member => assertEquals(testClientId, 
member.clientId()))
+          val topicPartitionsByTopic = 
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
+          topicSet.foreach { topic =>
+            val topicPartitions = topicPartitionsByTopic.getOrElse(topic, 
List.empty)
+            assertEquals(testNumPartitions, topicPartitions.size)
+          }
+
+          val expectedOperations = 
AclEntry.supportedOperations(ResourceType.GROUP)
+          assertEquals(expectedOperations, 
testGroupDescription.authorizedOperations())
+
+          // Test that the fake group is listed as dead.
+          
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(fakeGroupId))
+          val fakeGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(fakeGroupId).get()
+
+          assertEquals(fakeGroupId, fakeGroupDescription.groupId())
+          assertEquals(0, fakeGroupDescription.members().size())
+          assertEquals(ShareGroupState.DEAD, fakeGroupDescription.state())
+          assertNull(fakeGroupDescription.authorizedOperations())
+
+          // Test that all() returns 2 results
+          assertEquals(2, describeWithFakeGroupResult.all().get().size())
+
+          val describeTestGroupResult = 
client.describeShareGroups(Collections.singleton(testGroupId),
+            new DescribeShareGroupsOptions().includeAuthorizedOperations(true))
+          assertEquals(1, describeTestGroupResult.all().get().size())
+          assertEquals(1, describeTestGroupResult.describedGroups().size())
+
+          testGroupDescription = 
describeTestGroupResult.describedGroups().get(testGroupId).get()
+
+          assertEquals(testGroupId, testGroupDescription.groupId)
+          assertEquals(consumerSet.size, testGroupDescription.members().size())
+
+          // Describing a share group using describeConsumerGroups reports it 
as a DEAD consumer group
+          // in the same way as a non-existent group
+          val describeConsumerGroupResult = 
client.describeConsumerGroups(Collections.singleton(testGroupId),
+            new 
DescribeConsumerGroupsOptions().includeAuthorizedOperations(true))
+          assertEquals(1, describeConsumerGroupResult.all().get().size())
+
+          val deadConsumerGroupDescription = 
describeConsumerGroupResult.describedGroups().get(testGroupId).get()
+          assertEquals(testGroupId, deadConsumerGroupDescription.groupId())
+          assertEquals(0, deadConsumerGroupDescription.members().size())
+          assertEquals("", deadConsumerGroupDescription.partitionAssignor())
+          assertEquals(ConsumerGroupState.DEAD, 
deadConsumerGroupDescription.state())
+          assertEquals(expectedOperations, 
deadConsumerGroupDescription.authorizedOperations())
+        } finally {
+          consumerThreads.foreach {
+            case consumerThread =>
+              consumerThread.interrupt()
+              consumerThread.join()
+          }
+        }
+      } finally {
+        consumerSet.foreach(consumer => Utils.closeQuietly(consumer, 
"consumer"))

Review Comment:
   Any reason why we can't move this line to the `finally` block in line 2100 
and get ripped of this extra `try`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to