apoorvmittal10 commented on code in PR #19640: URL: https://github.com/apache/kafka/pull/19640#discussion_r2074025568
########## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ########## @@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet) } + protected def createSocket(): Socket = { + IntegrationTestUtils.connect( + cluster.anyBrokerSocketServer(), + cluster.clientListener() + ) + } + + protected def createSocket(destination: Int): Socket = { Review Comment: Shouldn't it be `connect`? ########## core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala: ########## @@ -59,11 +59,15 @@ class ShareFetchAcknowledgeRequestTest(cluster: ClusterInstance) extends GroupCo new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 1)) ) + val socket: Socket = createSocket() + val shareFetchRequest = createShareFetchRequest(groupId, metadata, send, Seq.empty, Map.empty) - val shareFetchResponse = connectAndReceiveWithoutClosingSocket[ShareFetchResponse](shareFetchRequest) + val shareFetchResponse = sendAndReceiveFromExistingSocket[ShareFetchResponse](shareFetchRequest, socket) assertEquals(Errors.UNSUPPORTED_VERSION.code, shareFetchResponse.data.errorCode) assertEquals(0, shareFetchResponse.data.acquisitionLockTimeoutMs) + + socket.close() Review Comment: Is there a way we can have a check in `tearDown` that all open sockets are closed from the tests? ########## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ########## @@ -913,6 +902,20 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { assertEquals(expectedResponseData.results.asScala.toSet, deleteGroupsResponse.data.results.asScala.toSet) } + protected def createSocket(): Socket = { Review Comment: Shouldn't it be `connectAny`? ########## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ########## @@ -934,20 +937,10 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { ) } - protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( + protected def sendAndReceiveFromExistingSocket[T <: AbstractResponse]( request: AbstractRequest, - destination: Int - )(implicit classTag: ClassTag[T]): T = { - val socket = IntegrationTestUtils.connect(brokerSocketServer(destination), cluster.clientListener()) - openSockets += socket - IntegrationTestUtils.sendAndReceive[T](request, socket) - } - - protected def connectAndReceiveWithoutClosingSocket[T <: AbstractResponse]( - request: AbstractRequest + socket: Socket )(implicit classTag: ClassTag[T]): T = { - val socket = IntegrationTestUtils.connect(cluster.anyBrokerSocketServer(), cluster.clientListener()) - openSockets += socket IntegrationTestUtils.sendAndReceive[T](request, socket) } Review Comment: Why do you need this method and can't use `IntegrationTestUtils.sendAndReceive[T](request, socket)` directly? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org