mdedetrich commented on code in PR #539:
URL: https://github.com/apache/incubator-pekko/pull/539#discussion_r1285177184
##########
project/SbtMultiJvm.scala:
##########
@@ -167,10 +168,11 @@ object MultiJvmPlugin extends AutoPlugin {
// the first class wins just like a classpath
// just concatenate conflicting text files
assembly / assemblyMergeStrategy := {
- case n if n.endsWith(".class") => MergeStrategy.first
- case n if n.endsWith(".txt") => MergeStrategy.concat
- case n if n.endsWith("NOTICE") => MergeStrategy.concat
- case n => (assembly /
assemblyMergeStrategy).value.apply(n)
+ case n if n.endsWith(".class") => MergeStrategy.first
+ case n if n.endsWith(".txt") => MergeStrategy.concat
+ case n if n.endsWith("NOTICE") => MergeStrategy.concat
+ case n if n.endsWith("LICENSE") => MergeStrategy.concat
Review Comment:
Although this change was needed for this feature specifically, I think it
would be wiser to make this change in a separate PR because in general we
should concatenate all `LICENSE` statements.
##########
multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala:
##########
@@ -94,44 +95,88 @@ private[pekko] case object Client extends Role
*/
private[pekko] case object Server extends Role
+/**
+ * INTERNAL API.
+ */
+private[pekko] trait RemoteConnection {
+
+ /**
+ * The channel future associated with this connection.
+ */
+ def channelFuture: ChannelFuture
+
+ /**
+ * Shutdown the connection and release the resources.
+ */
+ def shutdown(): Unit
+}
+
/**
* INTERNAL API.
*/
private[pekko] object RemoteConnection {
- def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler:
ChannelUpstreamHandler): Channel = {
+ def apply(
+ role: Role,
+ sockaddr: InetSocketAddress,
+ poolSize: Int,
+ handler: ChannelInboundHandler): RemoteConnection = {
role match {
case Client =>
- val socketfactory =
- new NioClientSocketChannelFactory(Executors.newCachedThreadPool,
Executors.newCachedThreadPool, poolSize)
- val bootstrap = new ClientBootstrap(socketfactory)
- bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
- bootstrap.setOption("tcpNoDelay", true)
- bootstrap.connect(sockaddr).getChannel
+ val bootstrap = new Bootstrap()
+ val eventLoopGroup = new NioEventLoopGroup(poolSize)
+ val cf = bootstrap
+ .group(eventLoopGroup)
+ .channel(classOf[NioSocketChannel])
+ .handler(new TestConductorPipelineFactory(handler))
+ .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+ .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
+ .connect(sockaddr)
+ new RemoteConnection {
+ override def channelFuture: ChannelFuture = cf
+
+ override def shutdown(): Unit = {
+ try {
+ channelFuture.channel().close().sync()
+ eventLoopGroup.shutdownGracefully(1L, 1L, TimeUnit.SECONDS)
Review Comment:
Is it worth it to make these shutdown gracefully constants configurable in
typesafe config? As I understand these values are the underlying reasons why
the implementation initially didn't work on Mac/Linux and if so I think it
makes sense for users to be able to configure this without needing a new
release incase there are further issues in the future.
##########
cluster/src/multi-jvm/resources/logback-test.xml:
##########
@@ -0,0 +1,13 @@
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -
%msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="info">
+ <appender-ref ref="STDOUT" />
+ </root>
+ <logger name="io.netty.util.Recycler" level="ERROR" />
Review Comment:
Is this something that was needed just because of netty4 or is it generally
helpful? If it is generally helpful I would do this change in a separate PR so
that we can backport it to Pekko 1.0.x
##########
multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala:
##########
@@ -94,44 +95,88 @@ private[pekko] case object Client extends Role
*/
private[pekko] case object Server extends Role
+/**
+ * INTERNAL API.
+ */
+private[pekko] trait RemoteConnection {
+
+ /**
+ * The channel future associated with this connection.
+ */
+ def channelFuture: ChannelFuture
+
+ /**
+ * Shutdown the connection and release the resources.
+ */
+ def shutdown(): Unit
+}
+
/**
* INTERNAL API.
*/
private[pekko] object RemoteConnection {
- def apply(role: Role, sockaddr: InetSocketAddress, poolSize: Int, handler:
ChannelUpstreamHandler): Channel = {
+ def apply(
+ role: Role,
+ sockaddr: InetSocketAddress,
+ poolSize: Int,
+ handler: ChannelInboundHandler): RemoteConnection = {
role match {
case Client =>
- val socketfactory =
- new NioClientSocketChannelFactory(Executors.newCachedThreadPool,
Executors.newCachedThreadPool, poolSize)
- val bootstrap = new ClientBootstrap(socketfactory)
- bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
- bootstrap.setOption("tcpNoDelay", true)
- bootstrap.connect(sockaddr).getChannel
+ val bootstrap = new Bootstrap()
+ val eventLoopGroup = new NioEventLoopGroup(poolSize)
+ val cf = bootstrap
+ .group(eventLoopGroup)
+ .channel(classOf[NioSocketChannel])
+ .handler(new TestConductorPipelineFactory(handler))
+ .option[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+ .option[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
+ .connect(sockaddr)
+ new RemoteConnection {
+ override def channelFuture: ChannelFuture = cf
+
+ override def shutdown(): Unit = {
+ try {
+ channelFuture.channel().close().sync()
+ eventLoopGroup.shutdownGracefully(1L, 1L, TimeUnit.SECONDS)
+ } catch {
+ case NonFatal(_) => // silence this one to not make tests look
like they failed, it's not really critical
+ }
+ }
+ }
+
case Server =>
- val socketfactory =
- new NioServerSocketChannelFactory(Executors.newCachedThreadPool,
Executors.newCachedThreadPool, poolSize)
- val bootstrap = new ServerBootstrap(socketfactory)
- bootstrap.setPipelineFactory(new TestConductorPipelineFactory(handler))
- bootstrap.setOption("reuseAddress", !Helpers.isWindows)
- bootstrap.setOption("child.tcpNoDelay", true)
- bootstrap.bind(sockaddr)
+ val bootstrap = new ServerBootstrap()
+ val parentEventLoopGroup = new NioEventLoopGroup(poolSize)
+ val childEventLoopGroup = new NioEventLoopGroup(poolSize)
+ val cf = bootstrap
+ .group(parentEventLoopGroup, childEventLoopGroup)
+ .channel(classOf[NioServerSocketChannel])
+ .childHandler(new TestConductorPipelineFactory(handler))
+ .option[java.lang.Boolean](ChannelOption.SO_REUSEADDR,
!Helpers.isWindows)
+ .option[java.lang.Integer](ChannelOption.SO_BACKLOG, 2048)
+ .childOption[java.lang.Boolean](ChannelOption.TCP_NODELAY, true)
+ .childOption[java.lang.Boolean](ChannelOption.SO_KEEPALIVE, true)
+ .bind(sockaddr)
+ .sync()
+
+ new RemoteConnection {
+ override def channelFuture: ChannelFuture = cf
+
+ override def shutdown(): Unit = {
+ try {
+ channelFuture.channel().close().sync()
+ parentEventLoopGroup.shutdownGracefully(1L, 1L, TimeUnit.SECONDS)
Review Comment:
Same as previous comment about configuration
##########
project/SbtMultiJvm.scala:
##########
@@ -372,11 +374,41 @@ object MultiJvmPlugin extends AutoPlugin {
val connectInput = input && index == 0
log.debug("Starting %s for %s".format(jvmName, testClass))
log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" ")))
- (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions,
jvmLogger, connectInput))
+ val testClass2Process = (testClass, Jvm.startJvm(javaBin,
allJvmOptions, runOptions, jvmLogger, connectInput))
+ if (index == 0) {
+ log.debug("%s for %s 's started as `Controller`, waiting before can
be connected for clients.".format(jvmName,
+ testClass))
+ val controllerHost = hosts.head
+ val serverPort: Int = Integer.getInteger("multinode.server-port",
4711)
+ waitingBeforeConnectable(controllerHost, serverPort,
TimeUnit.SECONDS.toMillis(20L))
+ }
+ testClass2Process
}
processExitCodes(name, processes, log)
}
+ private def waitingBeforeConnectable(host: String, port: Int,
timeoutInMillis: Long): Unit = {
+ val inetSocketAddress = new InetSocketAddress(host, port)
+ def telnet(addr: InetSocketAddress, timeout: Int): Boolean = {
+ val socket: Socket = new Socket()
+ try {
+ socket.connect(inetSocketAddress, timeout)
+ socket.isConnected
+ } catch {
+ case _: Exception => false
+ } finally {
+ socket.close()
+ }
+ }
+
+ val startTime = System.currentTimeMillis()
+ var connectivity = false
+ while (!connectivity && (System.currentTimeMillis() - startTime <
timeoutInMillis)) {
Review Comment:
Same with previous comment about constants.
##########
project/SbtMultiJvm.scala:
##########
@@ -372,11 +374,41 @@ object MultiJvmPlugin extends AutoPlugin {
val connectInput = input && index == 0
log.debug("Starting %s for %s".format(jvmName, testClass))
log.debug(" with JVM options: %s".format(allJvmOptions.mkString(" ")))
- (testClass, Jvm.startJvm(javaBin, allJvmOptions, runOptions,
jvmLogger, connectInput))
+ val testClass2Process = (testClass, Jvm.startJvm(javaBin,
allJvmOptions, runOptions, jvmLogger, connectInput))
+ if (index == 0) {
+ log.debug("%s for %s 's started as `Controller`, waiting before can
be connected for clients.".format(jvmName,
+ testClass))
+ val controllerHost = hosts.head
+ val serverPort: Int = Integer.getInteger("multinode.server-port",
4711)
Review Comment:
As with previous comments about magic constants should this be made
configurable (although in this specific case since we are dealing with test
classes in sbt project build it should be done by cli args as is done
elsewhere).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]