frankvicky commented on code in PR #19093:
URL: https://github.com/apache/kafka/pull/19093#discussion_r2025093889
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1901,28 +1933,77 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}, s"Expected to be able to list $testGroupId in group type
$groupType")
TestUtils.waitUntilTrue(() => {
- val options = new
ListConsumerGroupsOptions().withTypes(Set(groupType).asJava)
+ val options = new ListConsumerGroupsOptions()
+ .withTypes(Set(groupType).asJava)
.inGroupStates(Set(GroupState.STABLE).asJava)
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
- group.groupId == testGroupId &&
- group.groupState.get == GroupState.STABLE)
+ group.groupId == testGroupId && group.groupState.get ==
GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in group type $groupType
and state Stable")
TestUtils.waitUntilTrue(() => {
val options = new
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.STABLE).asJava)
val matching =
client.listConsumerGroups(options).all.get.asScala.filter(group =>
- group.groupId == testGroupId &&
- group.groupState.get == GroupState.STABLE)
+ group.groupId == testGroupId && group.groupState.get ==
GroupState.STABLE)
matching.size == 1
}, s"Expected to be able to list $testGroupId in state Stable")
TestUtils.waitUntilTrue(() => {
val options = new
ListConsumerGroupsOptions().inGroupStates(Set(GroupState.EMPTY).asJava)
- val matching =
client.listConsumerGroups(options).all.get.asScala.filter(
- _.groupId == testGroupId)
+ val matching =
client.listConsumerGroups(options).all.get.asScala.filter(_.groupId ==
testGroupId)
matching.isEmpty
- }, s"Expected to find zero groups")
+ }, "Expected to find zero groups")
+ } finally {
+ backgroundConsumers.close()
+ }
+ } finally {
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+ @MethodSource(Array("getTestGroupProtocolParametersAll"))
+ def testDescribeGroups(groupProtocol: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ try {
+ assertConsumerGroupsIsClean()
+
+ val testTopicName = "test_topic"
+ val testTopicName1 = testTopicName + "1"
+ val testTopicName2 = testTopicName + "2"
+ val testNumPartitions = 2
+ prepareTopics(List(testTopicName, testTopicName1, testTopicName2),
testNumPartitions)
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
Review Comment:
I add it to deflaky for assertion of groupEpoch.
Not sure why the empty topic will lead to groupEpoch changing.
I'm still investigating.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]