chia7712 commented on code in PR #20002:
URL: https://github.com/apache/kafka/pull/20002#discussion_r2160018390


##########
tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java:
##########
@@ -506,7 +507,9 @@ public void 
testUpdateBrokerConfigNotAffectedByInvalidConfig() {
                             "--entity-type", "brokers",
                             "--entity-default"))));
             kafka.utils.TestUtils.waitUntilTrue(
-                    () -> 
cluster.brokerSocketServers().stream().allMatch(broker -> 
broker.config().getInt("log.cleaner.threads") == 2),
+                    () -> cluster.brokers().values().stream()

Review Comment:
   ```java
                       () -> cluster.brokers().values().stream()
                           .map(KafkaBroker::config)
                           .allMatch(config -> 
config.getInt("log.cleaner.threads") == 2),
   ```



##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -182,9 +182,9 @@ class ProducerIntegrationTest {
 
   private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
     // Request enough PIDs from each broker to ensure each broker generates 
two blocks
-    val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker 
=> {
+    val ids = clusterInstance.brokers().values().stream().flatMap( broker => {
       IntStream.range(0, 1001).parallel().mapToObj( _ =>
-        nextProducerId(broker, clusterInstance.clientListener())
+        nextProducerId(broker.socketServer, clusterInstance.clientListener())

Review Comment:
   Could you pleas change the arguments of `nextProducerId` to "port"? that can 
simplify the method and then we don't need to use `SocketServer`



##########
tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java:
##########
@@ -444,7 +445,7 @@ public void 
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
     @ClusterTest
     public void testUpdateInvalidBrokerConfigs() {
         updateAndCheckInvalidBrokerConfig(Optional.empty());
-        
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
 + ""));
+        
updateAndCheckInvalidBrokerConfig(cluster.brokers().values().stream().map(broker
 -> broker.socketServer().config().brokerId() + "").findFirst());

Review Comment:
   ```java
   
updateAndCheckInvalidBrokerConfig(Optional.of(String.valueOf((cluster.brokers().entrySet().iterator().next().getKey()))));
   ```



##########
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##########
@@ -38,9 +38,9 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
 
   def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: 
ListenerName): ApiVersionsResponse = {

Review Comment:
   Could you please consider adding `controllerBoundPorts` to `ClusterInstance`?
   
   ```java
       default List<Integer> controllerBoundPorts() {
           return controllers().values().stream()
                   .map(ControllerServer::socketServer)
                   .map(s -> s.boundPort(controllerListenerName()))
                   .toList();
       }
   ```
   
   then `sendApiVersionsRequest` could be removed as the users could 
straightforward use `IntegrationTestUtils.connectAndReceive`.
   ```java
       val apiVersionsResponse = 
IntegrationTestUtils.connectAndReceive(apiVersionsRequest, 
cluster.controllerBoundPorts().get(0))
   ``` 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to