divijvaidya commented on code in PR #13572: URL: https://github.com/apache/kafka/pull/13572#discussion_r1170213078
########## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ########## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { + val acceptor = server.dataPlaneAcceptor(listener) + val channel = acceptor.get.serverChannel + verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { + shutdownServerAndMetrics(server) + val testProps = new Properties + testProps ++= props + testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") Review Comment: please add a comment here. We are using 0 so that OS can choose to associated any available port here. ########## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ########## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { + val acceptor = server.dataPlaneAcceptor(listener) + val channel = acceptor.get.serverChannel + verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { + shutdownServerAndMetrics(server) + val testProps = new Properties + testProps ++= props + testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") + testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") + testProps.put("control.plane.listener.name", "CONTROL_PLANE") + val config = KafkaConfig.fromProps(testProps) + val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) + val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel + verifySocketUsesReuseAddress(channel) + shutdownServerAndMetrics(testServer) Review Comment: perhaps do this in try/finally so that the resources are cleaned even if the test fails with an exception. ########## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ########## @@ -1893,6 +1893,33 @@ class SocketServerTest { }, false) } + @Test + def testDataPlaneAcceptingSocketUsesReuseAddress(): Unit = { + val acceptor = server.dataPlaneAcceptor(listener) + val channel = acceptor.get.serverChannel + verifySocketUsesReuseAddress(channel) + } + + @Test + def testControlPlaneAcceptingSocketUsesReuseAddress(): Unit = { + shutdownServerAndMetrics(server) + val testProps = new Properties + testProps ++= props + testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROL_PLANE://localhost:0") + testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") + testProps.put("control.plane.listener.name", "CONTROL_PLANE") + val config = KafkaConfig.fromProps(testProps) + val testServer = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager) + val channel = testServer.controlPlaneAcceptorOpt.get.serverChannel Review Comment: since we are starting up two listeners, could we verify reuse address for both please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org