dengziming commented on code in PR #12171:
URL: https://github.com/apache/kafka/pull/12171#discussion_r874414621
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1380,19 +1426,18 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
/** Changes the <i>preferred</i> leader without changing the
<i>current</i> leader. */
- def changePreferredLeader(newAssignment: Seq[Int]) = {
+ def changePreferredLeader(newAssignment: Seq[Int]): Unit = {
val preferred = newAssignment.head
- val prior1 = zkClient.getLeaderForPartition(partition1).get
- val prior2 = zkClient.getLeaderForPartition(partition2).get
-
- var m = Map.empty[TopicPartition, Seq[Int]]
+ val prior1 = TestUtils.findCurrentLeader(client, partition1).get
Review Comment:
Similarly, we can use `
brokers.head.metadataCache.getPartitionLeaderEndpoint(partition1.topic,
partition1.partition(), listenerName).get.id()`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -94,9 +93,11 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
client.close() // double close has no effect
Review Comment:
testClose should also support KRaft.
##########
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##########
@@ -66,8 +69,9 @@ abstract class BaseAdminIntegrationTest extends
IntegrationTestHarness with Logg
super.tearDown()
}
- @Test
- def testCreateDeleteTopics(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateDeleteTopics(quorum: String): Unit = {
Review Comment:
Currently we can't convert this because `SslAdminIntegrationTest` will fail
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -184,40 +192,52 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
val results = client.describeTopics(Seq(nonExistingTopic,
existingTopic).asJava).topicNameValues()
assertEquals(existingTopic, results.get(existingTopic).get.name)
assertThrows(classOf[ExecutionException], () =>
results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException]
- assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+
+ if (!isKRaftTest()) {
+ assertEquals(None, zkClient.getTopicPartitionCount(nonExistingTopic))
+ }
}
- @Test
- def testDescribeTopicsWithIds(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDescribeTopicsWithIds(quorum: String): Unit = {
client = Admin.create(createConfig)
val existingTopic = "existing-topic"
client.createTopics(Seq(existingTopic).map(new NewTopic(_, 1,
1.toShort)).asJava).all.get()
waitForTopics(client, Seq(existingTopic), List())
- val existingTopicId =
zkClient.getTopicIdsForTopics(Set(existingTopic)).values.head
-
+ val existingTopicId = TestUtils.describeTopicByName(client,
existingTopic).topicId
Review Comment:
We can get topicId from metadataCache to avoid sending a request,
`brokers.head.metadataCache.getTopicId(existingTopic)`
##########
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##########
@@ -1359,19 +1394,30 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
- @Test
- def testElectPreferredLeaders(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testElectPreferredLeaders(quorum: String): Unit = {
client = Admin.create(createConfig)
val prefer0 = Seq(0, 1, 2)
val prefer1 = Seq(1, 2, 0)
val prefer2 = Seq(2, 0, 1)
val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0)
- TestUtils.createTopic(zkClient, partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> prefer0), servers)
+ TestUtils.createTopicWithAdmin(
Review Comment:
We can directly use `createTopicWithAssignment(partition1.topic, Map[Int,
Seq[Int]](partition1.partition -> prefer0))`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]