jsancio commented on a change in pull request #11790:
URL: https://github.com/apache/kafka/pull/11790#discussion_r813357222
##########
File path:
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##########
@@ -168,18 +198,41 @@ abstract class KafkaServerTestHarness extends
QuorumTestHarness {
* Wait until the leader is elected and the metadata is propagated to all
brokers.
* Return the leader for each partition.
*/
- def createTopic(topic: String, partitionReplicaAssignment:
collection.Map[Int, Seq[Int]]): scala.collection.immutable.Map[Int, Int] =
+ def createTopicWithAssignment(
+ topic: String,
+ partitionReplicaAssignment: collection.Map[Int, Seq[Int]],
+ listenerName: ListenerName = listenerName
+ ): scala.collection.immutable.Map[Int, Int] =
if (isKRaftTest()) {
- TestUtils.createTopicWithAdmin(topic = topic,
- replicaAssignment = partitionReplicaAssignment,
- brokers = brokers)
+ resource(createAdminClient(brokers, listenerName)) { admin =>
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ topic = topic,
+ replicaAssignment = partitionReplicaAssignment,
+ brokers = brokers
+ )
+ }
} else {
- TestUtils.createTopic(zkClient, topic, partitionReplicaAssignment,
servers)
+ TestUtils.createTopic(
+ zkClient,
+ topic,
+ partitionReplicaAssignment,
+ servers
+ )
}
- def deleteTopic(topic: String): Unit = {
+ def deleteTopic(
+ topic: String,
+ listenerName: ListenerName = listenerName
+ ): Unit = {
if (isKRaftTest()) {
- TestUtils.deleteTopicWithAdmin(topic, brokers)
+
Review comment:
Extra newline.
##########
File path: core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
##########
@@ -58,7 +59,10 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- consumer =
TestUtils.createConsumer(TestUtils.getBrokerListStrFromServers(servers),
securityProtocol = SecurityProtocol.PLAINTEXT)
+ consumer = TestUtils.createConsumer(bootstrapServers(
+ listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
+ securityProtocol = SecurityProtocol.PLAINTEXT
+ )
Review comment:
I think the formatting is incorrect here. Should it be?
```suggestion
consumer = TestUtils.createConsumer(
bootstrapServers(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
securityProtocol = SecurityProtocol.PLAINTEXT
)
```
##########
File path:
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -204,7 +204,7 @@ class BrokerMetadataListener(
batch.records().forEach { messageAndVersion =>
if (isTraceEnabled) {
trace("Metadata batch %d: processing [%d/%d]:
%s.".format(batch.lastOffset, index + 1,
- batch.records().size(), messageAndVersion.message().toString()))
+ batch.records().size(), messageAndVersion.message()))
Review comment:
I didn't notice this earlier but maybe we can just first this to use
Scala's `s""` interpolation instead of `format`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]