chia7712 commented on code in PR #16658:
URL: https://github.com/apache/kafka/pull/16658#discussion_r1709884524


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2961,6 +2961,36 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     testAppendConfig(props, "0:0", "1:1,0:0")
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("quorum=kraft"))
+  def testListClientMetricsResources(ignored: String): Unit = {
+    client = createAdminClient
+    def newTopic = new NewTopic(topic, partition, 0.toShort)
+    client.createTopics(Collections.singleton(newTopic))
+    assertTrue(client.listClientMetricsResources().all().get().isEmpty)
+    def name = "name"
+    def configResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, name)
+    val configEntry = new ConfigEntry("interval.ms", "111")
+    def configOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET)
+    client.incrementalAlterConfigs(Collections.singletonMap(configResource, 
Collections.singletonList(configOp))).all().get()
+    def result = client.listClientMetricsResources().all().get()
+    def expected = Collections.singletonList(new 
ClientMetricsResourceListing(name))
+    assertEquals(new util.HashSet(expected), new util.HashSet(result))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("quorum=kraft"))
+  @Timeout(30)
+  def testListClientMetricsResourcesTimeoutMs(ignored: String): Unit = {
+    client = createInvalidAdminClient()
+    try {
+      def timeoutOption = new ListClientMetricsResourcesOptions().timeoutMs(0)

Review Comment:
   ditto



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -2599,6 +2599,33 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     testAppendConfig(props, "0:0", "1:1,0:0")
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("quorum=kraft"))
+  def testListClientMetricsResources(ignored: String): Unit = {

Review Comment:
   ok, that can work.



##########
core/src/test/scala/integration/kafka/admin/AdminFenceProducersIntegrationTest.scala:
##########
@@ -107,6 +108,37 @@ class AdminFenceProducersIntegrationTest extends 
IntegrationTestHarness {
     assertThrows(classOf[ProducerFencedException], () => 
producer.commitTransaction())
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFenceProducerTimeoutMs(quorum: String): Unit = {
+    producer.initTransactions()
+    producer.beginTransaction()
+    producer.send(record).get()
+    producer.commitTransaction()
+    def timeoutOption = new FenceProducersOptions()
+    try {
+      adminClient.fenceProducers(Collections.singletonList(txnId), 
timeoutOption.timeoutMs(0)).all().get()

Review Comment:
   this will be flaky. It can be completed if the request is executed 
immediately. Please create a `Admin` with invalid bootstrap and then pass a 
request having timeout=0. In that scenario, the request should return timeout 
exception. see 
https://github.com/apache/kafka/blob/0cbc5e083a936025a85f127102dc1032f6cf4fd9/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala#L159
 for example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to