He-Pin commented on code in PR #486:
URL: https://github.com/apache/incubator-pekko/pull/486#discussion_r1280722867
##########
multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala:
##########
@@ -94,44 +91,75 @@ private[pekko] case object Client extends Role
*/
private[pekko] case object Server extends Role
+/**
+ * INTERNAL API.
+ */
+private[pekko] trait RemoteConnection {
+ def channel: Channel
+ def shutdown(): Unit
+}
+
/**
* INTERNAL API.
*/
private[pekko] object RemoteConnection {
- def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler:
ChannelUpstreamHandler): Channel = {
+ def apply(
+ role: Role,
+ sockaddr: InetSocketAddress,
+ poolSize: Int,
+ handler: ChannelInboundHandler,
+ log: LoggingAdapter): RemoteConnection = {
role match {
case Client =>
- val socketfactory =
- new NioClientSocketChannelFactory(Executors.newCachedThreadPool,
Executors.newCachedThreadPool, poolSize)
- val bootstrap = new ClientBootstrap(socketfactory)
- bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
- bootstrap.setOption("tcpNoDelay", true)
- bootstrap.connect(sockaddr).getChannel
+ val bootstrap = new Bootstrap()
+ val eventLoopGroup = new NioEventLoopGroup(poolSize)
+ val clientChannel = bootstrap
+ .group(eventLoopGroup)
+ .channel(classOf[NioSocketChannel])
+ .handler(new TestConductorPipelineFactory(handler))
+ .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+ .connect(sockaddr)
+ .sync()
+ .channel()
+ log.info("client connected to server addr:{} from local addr:{}",
sockaddr, clientChannel.localAddress)
+ new RemoteConnection {
+ override def channel: Channel = clientChannel
+ override def shutdown(): Unit = {
+ clientChannel.close().sync()
+ eventLoopGroup.shutdownGracefully()
+ eventLoopGroup.terminationFuture().sync()
+ }
+ }
+
case Server =>
- val socketfactory =
- new NioServerSocketChannelFactory(Executors.newCachedThreadPool,
Executors.newCachedThreadPool, poolSize)
- val bootstrap = new ServerBootstrap(socketfactory)
- bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
- bootstrap.setOption("reuseAddress", !Helpers.isWindows)
- bootstrap.setOption("child.tcpNoDelay", true)
- bootstrap.bind(sockaddr)
+ val bootstrap = new ServerBootstrap()
+ val parentEventLoopGroup = new NioEventLoopGroup(poolSize)
+ val childEventLoopGroup = new NioEventLoopGroup(poolSize)
+ val serverChannel = bootstrap
+ .group(parentEventLoopGroup, childEventLoopGroup)
+ .channel(classOf[NioServerSocketChannel])
+ .childHandler(new TestConductorPipelineFactory(handler))
+ .option[java.lang.Boolean](ChannelOption.SO_REUSEADDR,
!Helpers.isWindows)
+ .childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+ .bind(sockaddr)
+ .sync()
+ .channel()
+ log.info("server bind to addr:{}", sockaddr)
Review Comment:
I add some log here to saw what happend on @mdedetrich 's box.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]