chia7712 commented on code in PR #19776:
URL: https://github.com/apache/kafka/pull/19776#discussion_r2116362068


##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -205,11 +205,12 @@ class ProducerIntegrationTest {
         .setProducerId(RecordBatch.NO_PRODUCER_ID)
         .setTransactionalId(null)
         .setTransactionTimeoutMs(10)
-      val request = new InitProducerIdRequest.Builder(data).build()
+    val request = new InitProducerIdRequest.Builder(data).build()

Review Comment:
   ```scala
         val request = new InitProducerIdRequest.Builder(data).build()
         val port = broker.boundPort(listener)
         response = 
IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port)
         shouldRetry = response.data.errorCode == 
Errors.COORDINATOR_LOAD_IN_PROGRESS.code
   ```



##########
core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala:
##########
@@ -79,10 +80,13 @@ class AllocateProducerIdsRequestTest(cluster: 
ClusterInstance) {
     controllerSocketServer: SocketServer,
     request: AllocateProducerIdsRequest
   ): AllocateProducerIdsResponse = {
+
+    val listenerName = cluster.controllerListenerName
+    val port = controllerSocketServer.boundPort(listenerName)
+
     IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](

Review Comment:
   ```scala
       IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse](
         request,
         controllerSocketServer.boundPort(cluster.controllerListenerName)
       )
   ```



##########
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##########
@@ -41,12 +42,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     } else {
       cluster.brokerSocketServers().asScala.head
     }
-    IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, 
socket, listenerName)
+
+    val port = socket.boundPort(listenerName)
+
+    IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, port)
   }
 
   def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): 
ApiVersionsResponse = {
     val overrideHeader = 
IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue)
-    val socket = 
IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, 
cluster.clientListener())
+    val socketServer = cluster.brokerSocketServers().asScala.head

Review Comment:
   with that helper, we can use `val socket = 
IntegrationTestUtils.connect(cluster.boundPorts().asScala.head)` to streamline 
code, right?



##########
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala:
##########
@@ -41,12 +42,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: 
ClusterInstance) {
     } else {
       cluster.brokerSocketServers().asScala.head
     }
-    IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, 
socket, listenerName)
+
+    val port = socket.boundPort(listenerName)

Review Comment:
   ```scala
   IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, 
socket.boundPort(listenerName))
   ```



##########
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##########
@@ -492,12 +492,16 @@ class KRaftClusterTest {
   }
 
   private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
-                                                    listenerName: 
ListenerName): DescribeClusterResponse =
-    connectAndReceive[DescribeClusterResponse](
-      request = new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()).build(),
-      destination = destination,
-      listenerName = listenerName
-    )
+                                                    listenerName: 
ListenerName): DescribeClusterResponse = {
+
+      val port = destination.boundPort(listenerName)

Review Comment:
   ```scala
         connectAndReceive[DescribeClusterResponse](new 
DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
           destination.boundPort(listenerName))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to