chia7712 commented on code in PR #20002: URL: https://github.com/apache/kafka/pull/20002#discussion_r2160018390
########## tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java: ########## @@ -506,7 +507,9 @@ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() { "--entity-type", "brokers", "--entity-default")))); kafka.utils.TestUtils.waitUntilTrue( - () -> cluster.brokerSocketServers().stream().allMatch(broker -> broker.config().getInt("log.cleaner.threads") == 2), + () -> cluster.brokers().values().stream() Review Comment: ```java () -> cluster.brokers().values().stream() .map(KafkaBroker::config) .allMatch(config -> config.getInt("log.cleaner.threads") == 2), ``` ########## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala: ########## @@ -182,9 +182,9 @@ class ProducerIntegrationTest { private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = { // Request enough PIDs from each broker to ensure each broker generates two blocks - val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => { + val ids = clusterInstance.brokers().values().stream().flatMap( broker => { IntStream.range(0, 1001).parallel().mapToObj( _ => - nextProducerId(broker, clusterInstance.clientListener()) + nextProducerId(broker.socketServer, clusterInstance.clientListener()) Review Comment: Could you pleas change the arguments of `nextProducerId` to "port"? that can simplify the method and then we don't need to use `SocketServer` ########## tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java: ########## @@ -444,7 +445,7 @@ public void testUpdatePerBrokerConfigInKRaftThenShouldFail() { @ClusterTest public void testUpdateInvalidBrokerConfigs() { updateAndCheckInvalidBrokerConfig(Optional.empty()); - updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId() + "")); + updateAndCheckInvalidBrokerConfig(cluster.brokers().values().stream().map(broker -> broker.socketServer().config().brokerId() + "").findFirst()); Review Comment: ```java updateAndCheckInvalidBrokerConfig(Optional.of(String.valueOf((cluster.brokers().entrySet().iterator().next().getKey())))); ``` ########## core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala: ########## @@ -38,9 +38,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = { Review Comment: Could you please consider adding `controllerBoundPorts` to `ClusterInstance`? ```java default List<Integer> controllerBoundPorts() { return controllers().values().stream() .map(ControllerServer::socketServer) .map(s -> s.boundPort(controllerListenerName())) .toList(); } ``` then `sendApiVersionsRequest` could be removed as the users could straightforward use `IntegrationTestUtils.connectAndReceive`. ```java val apiVersionsResponse = IntegrationTestUtils.connectAndReceive(apiVersionsRequest, cluster.controllerBoundPorts().get(0)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org