jsancio commented on a change in pull request #11790:
URL: https://github.com/apache/kafka/pull/11790#discussion_r813357222



##########
File path: 
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -168,18 +198,41 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
    * Wait until the leader is elected and the metadata is propagated to all 
brokers.
    * Return the leader for each partition.
    */
-  def createTopic(topic: String, partitionReplicaAssignment: 
collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
+  def createTopicWithAssignment(
+    topic: String,
+    partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
+    listenerName: ListenerName = listenerName
+  ): scala.collection.immutable.Map[Int, Int] =
     if (isKRaftTest()) {
-      TestUtils.createTopicWithAdmin(topic = topic,
-        replicaAssignment = partitionReplicaAssignment,
-        brokers = brokers)
+      resource(createAdminClient(brokers, listenerName)) { admin =>
+        TestUtils.createTopicWithAdmin(
+          admin = admin,
+          topic = topic,
+          replicaAssignment = partitionReplicaAssignment,
+          brokers = brokers
+        )
+      }
     } else {
-      TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment, 
servers)
+      TestUtils.createTopic(
+        zkClient,
+        topic,
+        partitionReplicaAssignment,
+        servers
+      )
     }
 
-  def deleteTopic(topic: String): Unit = {
+  def deleteTopic(
+    topic: String,
+    listenerName: ListenerName = listenerName
+  ): Unit = {
     if (isKRaftTest()) {
-      TestUtils.deleteTopicWithAdmin(topic, brokers)
+

Review comment:
       Extra newline.

##########
File path: core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
##########
@@ -58,7 +59,10 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    consumer = 
TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers), 
securityProtocol = SecurityProtocol.PLAINTEXT)
+    consumer = TestUtils.createConsumer(bootstrapServers(
+      listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
+      securityProtocol = SecurityProtocol.PLAINTEXT
+    )

Review comment:
       I think the formatting is incorrect here. Should it be?
   
   ```suggestion
       consumer = TestUtils.createConsumer(
         
bootstrapServers(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
         securityProtocol = SecurityProtocol.PLAINTEXT
       )
   ```

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -204,7 +204,7 @@ class BrokerMetadataListener(
       batch.records().forEach { messageAndVersion =>
         if (isTraceEnabled) {
           trace("Metadata batch %d: processing [%d/%d]: 
%s.".format(batch.lastOffset, index + 1,
-            batch.records().size(), messageAndVersion.message().toString()))
+            batch.records().size(), messageAndVersion.message()))

Review comment:
       I didn't notice this earlier but maybe we can just first this to use 
Scala's `s""` interpolation instead of `format`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to