dengziming commented on code in PR #12171:
URL: https://github.com/apache/kafka/pull/12171#discussion_r874414621


##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1380,19 +1426,18 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
 
     /** Changes the <i>preferred</i> leader without changing the 
<i>current</i> leader. */
-    def changePreferredLeader(newAssignment: Seq[Int]) = {
+    def changePreferredLeader(newAssignment: Seq[Int]): Unit = {
       val preferred = newAssignment.head
-      val prior1 = zkClient.getLeaderForPartition(partition1).get
-      val prior2 = zkClient.getLeaderForPartition(partition2).get
-
-      var m = Map.empty[TopicPartition, Seq[Int]]
+      val prior1 = TestUtils.findCurrentLeader(client, partition1).get

Review Comment:
   Similarly, we can use ` 
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic, 
partition1.partition(), listenerName).get.id()`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,9 +93,11 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     client.close() // double close has no effect

Review Comment:
   testClose should also support KRaft.



##########
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##########
@@ -66,8 +69,9 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
     super.tearDown()
   }
 
-  @Test
-  def testCreateDeleteTopics(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateDeleteTopics(quorum: String): Unit = {

Review Comment:
   Currently we can't convert this because `SslAdminIntegrationTest` will fail



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -184,40 +192,52 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val results = client.describeTopics(Seq(nonExistingTopic, 
existingTopic).asJava).topicNameValues()
     assertEquals(existingTopic, results.get(existingTopic).get.name)
     assertThrows(classOf[ExecutionException], () => 
results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
-    assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+
+    if (!isKRaftTest()) {
+      assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+    }
   }
 
-  @Test
-  def testDescribeTopicsWithIds(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDescribeTopicsWithIds(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val existingTopic = "existing-topic"
     client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1, 
1.toShort)).asJava).all.get()
     waitForTopics(client, Seq(existingTopic), List())
-    val existingTopicId = 
zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head
-
+    val existingTopicId = TestUtils.describeTopicByName(client, 
existingTopic).topicId

Review Comment:
   We can get topicId from metadataCache to avoid sending a request, 
`brokers.head.metadataCache.getTopicId(existingTopic)`



##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1359,19 +1394,30 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
-  @Test
-  def testElectPreferredLeaders(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testElectPreferredLeaders(quorum: String): Unit = {
     client = Admin.create(createConfig)
 
     val prefer0 = Seq(0, 1, 2)
     val prefer1 = Seq(1, 2, 0)
     val prefer2 = Seq(2, 0, 1)
 
     val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0)
-    TestUtils.createTopic(zkClient, partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> prefer0), servers)
+    TestUtils.createTopicWithAdmin(

Review Comment:
   We can directly use `createTopicWithAssignment(partition1.topic, Map[Int, 
Seq[Int]](partition1.partition -> prefer0))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to