chia7712 commented on code in PR #16658: URL: https://github.com/apache/kafka/pull/16658#discussion_r1709884524
########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2961,6 +2961,36 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { testAppendConfig(props, "0:0", "1:1,0:0") } + @ParameterizedTest + @ValueSource(strings = Array("quorum=kraft")) + def testListClientMetricsResources(ignored: String): Unit = { + client = createAdminClient + def newTopic = new NewTopic(topic, partition, 0.toShort) + client.createTopics(Collections.singleton(newTopic)) + assertTrue(client.listClientMetricsResources().all().get().isEmpty) + def name = "name" + def configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, name) + val configEntry = new ConfigEntry("interval.ms", "111") + def configOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET) + client.incrementalAlterConfigs(Collections.singletonMap(configResource, Collections.singletonList(configOp))).all().get() + def result = client.listClientMetricsResources().all().get() + def expected = Collections.singletonList(new ClientMetricsResourceListing(name)) + assertEquals(new util.HashSet(expected), new util.HashSet(result)) + } + + @ParameterizedTest + @ValueSource(strings = Array("quorum=kraft")) + @Timeout(30) + def testListClientMetricsResourcesTimeoutMs(ignored: String): Unit = { + client = createInvalidAdminClient() + try { + def timeoutOption = new ListClientMetricsResourcesOptions().timeoutMs(0) Review Comment: ditto ########## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ########## @@ -2599,6 +2599,33 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { testAppendConfig(props, "0:0", "1:1,0:0") } + @ParameterizedTest + @ValueSource(strings = Array("quorum=kraft")) + def testListClientMetricsResources(ignored: String): Unit = { Review Comment: ok, that can work. ########## core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala: ########## @@ -107,6 +108,37 @@ class AdminFenceProducersIntegrationTest extends IntegrationTestHarness { assertThrows(classOf[ProducerFencedException], () => producer.commitTransaction()) } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testFenceProducerTimeoutMs(quorum: String): Unit = { + producer.initTransactions() + producer.beginTransaction() + producer.send(record).get() + producer.commitTransaction() + def timeoutOption = new FenceProducersOptions() + try { + adminClient.fenceProducers(Collections.singletonList(txnId), timeoutOption.timeoutMs(0)).all().get() Review Comment: this will be flaky. It can be completed if the request is executed immediately. Please create a `Admin` with invalid bootstrap and then pass a request having timeout=0. In that scenario, the request should return timeout exception. see https://github.com/apache/kafka/blob/0cbc5e083a936025a85f127102dc1032f6cf4fd9/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala#L159 for example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org