apoorvmittal10 commented on code in PR #19640:
URL: https://github.com/apache/kafka/pull/19640#discussion_r2074025568


##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     assertEquals(expectedResponseData.results.asScala.toSet, 
deleteGroupsResponse.data.results.asScala.toSet)
   }
 
+  protected def createSocket(): Socket = {
+    IntegrationTestUtils.connect(
+      cluster.anyBrokerSocketServer(),
+      cluster.clientListener()
+    )
+  }
+
+  protected def createSocket(destination: Int): Socket = {

Review Comment:
   Shouldn't it be `connect`?



##########
core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala:
##########
@@ -59,11 +59,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1))
     )
 
+    val socket: Socket = createSocket()
+
     val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, 
Seq.empty, Map.empty)
-    val shareFetchResponse = 
connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest)
+    val shareFetchResponse = 
sendAndReceiveFromExistingSocket[ShareFetchResponse](shareFetchRequest, socket)
 
     assertEquals(Errors.UNSUPPORTED_VERSION.code, 
shareFetchResponse.data.errorCode)
     assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs)
+
+    socket.close()

Review Comment:
   Is there a way we can have a check in `tearDown` that all open sockets are 
closed from the tests?



##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     assertEquals(expectedResponseData.results.asScala.toSet, 
deleteGroupsResponse.data.results.asScala.toSet)
   }
 
+  protected def createSocket(): Socket = {

Review Comment:
   Shouldn't it be `connectAny`?



##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -934,20 +937,10 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     )
   }
 
-  protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
+  protected def sendAndReceiveFromExistingSocket[T <: AbstractResponse](
     request: AbstractRequest,
-    destination: Int
-  )(implicit classTag: ClassTag[T]): T = {
-    val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), 
cluster.clientListener())
-    openSockets += socket
-    IntegrationTestUtils.sendAndReceive[T](request, socket)
-  }
-
-  protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse](
-    request: AbstractRequest
+    socket: Socket
   )(implicit classTag: ClassTag[T]): T = {
-    val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), 
cluster.clientListener())
-    openSockets += socket
     IntegrationTestUtils.sendAndReceive[T](request, socket)
   }

Review Comment:
   Why do you need this method and can't use 
`IntegrationTestUtils.sendAndReceive[T](request, socket)` directly?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to