glennosss commented on a change in pull request #886:
URL: https://github.com/apache/james-project/pull/886#discussion_r830616117
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
##########
@@ -85,54 +88,61 @@ public synchronized void bind() throws Exception {
throw new RuntimeException("Please specify at least on
socketaddress to which the server should get bound!");
}
- bootstrap = new ServerBootstrap(createSocketChannelFactory());
- ChannelPipelineFactory factory = createPipelineFactory(channels);
-
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.channel(NioServerSocketChannel.class);
+
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup(ioWorker);
+
+ bootstrap.group(bossGroup, workerGroup);
+
+ ChannelInitializer<SocketChannel> factory =
createPipelineFactory(channels);
+
// Configure the pipeline factory.
- bootstrap.setPipelineFactory(factory);
- configureBootstrap(bootstrap);
+ bootstrap.childHandler(factory);
for (InetSocketAddress address : addresses) {
- channels.add(bootstrap.bind(address));
+ Channel channel = bootstrap.bind(address).sync().channel();
+ channels.add(channel);
}
- started = true;
+ configureBootstrap(bootstrap);
+
+ started = true;
}
/**
* Configure the bootstrap before it get bound
*/
protected void configureBootstrap(ServerBootstrap bootstrap) {
// Bind and start to accept incoming connections.
- bootstrap.setOption("backlog", backlog);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("child.tcpNoDelay", true);
- }
-
- protected ServerSocketChannelFactory createSocketChannelFactory() {
- return new NioServerSocketChannelFactory(createBossExecutor(),
createWorkerExecutor(), ioWorker);
+ bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
Review comment:
Should be `bootstrap.childOption(ChannelOption.TCP_NODELAY, true);`
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
##########
@@ -85,54 +88,61 @@ public synchronized void bind() throws Exception {
throw new RuntimeException("Please specify at least on
socketaddress to which the server should get bound!");
}
- bootstrap = new ServerBootstrap(createSocketChannelFactory());
- ChannelPipelineFactory factory = createPipelineFactory(channels);
-
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.channel(NioServerSocketChannel.class);
+
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup(ioWorker);
+
+ bootstrap.group(bossGroup, workerGroup);
+
+ ChannelInitializer<SocketChannel> factory =
createPipelineFactory(channels);
+
// Configure the pipeline factory.
- bootstrap.setPipelineFactory(factory);
- configureBootstrap(bootstrap);
+ bootstrap.childHandler(factory);
for (InetSocketAddress address : addresses) {
- channels.add(bootstrap.bind(address));
+ Channel channel = bootstrap.bind(address).sync().channel();
+ channels.add(channel);
}
- started = true;
+ configureBootstrap(bootstrap);
+
+ started = true;
}
/**
* Configure the bootstrap before it get bound
*/
protected void configureBootstrap(ServerBootstrap bootstrap) {
// Bind and start to accept incoming connections.
- bootstrap.setOption("backlog", backlog);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("child.tcpNoDelay", true);
- }
-
- protected ServerSocketChannelFactory createSocketChannelFactory() {
- return new NioServerSocketChannelFactory(createBossExecutor(),
createWorkerExecutor(), ioWorker);
+ bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
}
-
@Override
public synchronized void unbind() {
if (started == false) {
return;
}
- ChannelPipelineFactory factory = bootstrap.getPipelineFactory();
- if (factory instanceof ExternalResourceReleasable) {
- ((ExternalResourceReleasable) factory).releaseExternalResources();
+
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
}
- channels.close().awaitUninterruptibly();
- bootstrap.releaseExternalResources();
+
started = false;
}
@Override
public synchronized List<InetSocketAddress> getListenAddresses() {
ImmutableList.Builder<InetSocketAddress> builder =
ImmutableList.builder();
for (Channel channel : ImmutableList.copyOf(channels.iterator())) {
Review comment:
This line can just be;
```
for (Channel channel : channels) {
```
There are no advantages to creating an immutable copy of this iterator.
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
##########
@@ -40,26 +41,27 @@
import org.apache.james.protocols.api.handler.ProtocolHandlerChain;
import org.apache.james.protocols.api.handler.ProtocolHandlerResultHandler;
import org.apache.james.util.MDCBuilder;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandler.Sharable;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.util.AttributeKey;
+
+
/**
- * {@link ChannelUpstreamHandler} which is used by the SMTPServer and other
line based protocols
+ * {@link ChannelInboundHandlerAdapter} which is used by the SMTPServer and
other line based protocols
*/
@Sharable
-public class BasicChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
+public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter {
Review comment:
Class should technically be called `BasicChannelInboundHandler` now.
When moving from Netty 3 to 4, should rename all classes/etc;
- Upstream --> Inbound
- Downstream --> Outbound
A similar comment can be made about all the other classes in this package.
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractChannelPipelineFactory.java
##########
@@ -18,47 +18,46 @@
****************************************************************/
package org.apache.james.protocols.netty;
-import static org.jboss.netty.channel.Channels.pipeline;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.execution.ExecutionHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.HashedWheelTimer;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.stream.ChunkedWriteHandler;
/**
- * Abstract base class for {@link ChannelPipelineFactory} implementations
+ * Abstract base class for {@link ChannelInitializer} implementations
*/
-public abstract class AbstractChannelPipelineFactory implements
ChannelPipelineFactory {
[email protected]
+public abstract class AbstractChannelPipelineFactory<C extends SocketChannel>
extends ChannelInitializer<C> {
public static final int MAX_LINE_LENGTH = 8192;
protected final ConnectionLimitUpstreamHandler connectionLimitHandler;
protected final ConnectionPerIpLimitUpstreamHandler
connectionPerIpLimitHandler;
- private final HashedWheelTimer timer;
private final ChannelGroupHandler groupHandler;
private final int timeout;
- private final ExecutionHandler eHandler;
private final ChannelHandlerFactory frameHandlerFactory;
-
+
+ public AbstractChannelPipelineFactory(ChannelGroup channels,
+ ChannelHandlerFactory
frameHandlerFactory) {
+ this(0, 0, 0, channels, frameHandlerFactory);
+ }
+
public AbstractChannelPipelineFactory(int timeout, int maxConnections, int
maxConnectsPerIp, ChannelGroup channels,
- ExecutionHandler eHandler,
ChannelHandlerFactory frameHandlerFactory,
- HashedWheelTimer hashedWheelTimer) {
+ ChannelHandlerFactory
frameHandlerFactory) {
this.connectionLimitHandler = new
ConnectionLimitUpstreamHandler(maxConnections);
this.connectionPerIpLimitHandler = new
ConnectionPerIpLimitUpstreamHandler(maxConnectsPerIp);
Review comment:
I found when using NettyServer maxConnections/maxConnectsPerIp where
never set. Rather than adding redundant handlers that don't do anything could
go;
```
this.connectionLimitHandler = maxConnections > 0 ? new
ConnectionLimitInboundHandler(maxConnections) : null;
this.connectionPerIpLimitHandler = maxConnectionsPerIp > 0 ? new
ConnectionPerIpLimitInboundHandler(maxConnectionsPerIp) : null;
```
Then below
```
addLastIfNotNull(pipeline,
HandlerConstants.CONNECTION_LIMIT_HANDLER, connectionLimitHandler);
addLastIfNotNull(pipeline,
HandlerConstants.CONNECTION_PER_IP_LIMIT_HANDLER, connectionPerIpLimitHandler);
```
And add method;
```
private static void addLastIfNotNull(ChannelPipeline pipeline, String
name, ChannelHandler handler) {
if (handler != null) {
pipeline.addLast(name, handler);
}
}
```
This may seem largely unimportant - but it's important and best to construct
the simplest possible pipeline - a little extra work in setup will make
debugging and such easier later and avoid redundant work in the pipeline.
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
##########
@@ -85,54 +88,61 @@ public synchronized void bind() throws Exception {
throw new RuntimeException("Please specify at least on
socketaddress to which the server should get bound!");
}
- bootstrap = new ServerBootstrap(createSocketChannelFactory());
- ChannelPipelineFactory factory = createPipelineFactory(channels);
-
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.channel(NioServerSocketChannel.class);
+
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup(ioWorker);
+
+ bootstrap.group(bossGroup, workerGroup);
+
+ ChannelInitializer<SocketChannel> factory =
createPipelineFactory(channels);
+
// Configure the pipeline factory.
- bootstrap.setPipelineFactory(factory);
- configureBootstrap(bootstrap);
+ bootstrap.childHandler(factory);
for (InetSocketAddress address : addresses) {
- channels.add(bootstrap.bind(address));
+ Channel channel = bootstrap.bind(address).sync().channel();
+ channels.add(channel);
}
- started = true;
+ configureBootstrap(bootstrap);
+
+ started = true;
}
/**
* Configure the bootstrap before it get bound
*/
protected void configureBootstrap(ServerBootstrap bootstrap) {
// Bind and start to accept incoming connections.
- bootstrap.setOption("backlog", backlog);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("child.tcpNoDelay", true);
- }
-
- protected ServerSocketChannelFactory createSocketChannelFactory() {
- return new NioServerSocketChannelFactory(createBossExecutor(),
createWorkerExecutor(), ioWorker);
+ bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
}
-
@Override
public synchronized void unbind() {
if (started == false) {
return;
}
- ChannelPipelineFactory factory = bootstrap.getPipelineFactory();
- if (factory instanceof ExternalResourceReleasable) {
- ((ExternalResourceReleasable) factory).releaseExternalResources();
+
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
Review comment:
Bad code - should either wait for promises to finish (i.e. blocking
code, not recommended) or return promise.
Example blocking code; (not recommended)
```
EventLoopGroup workerGroup = this.workerGroup;
if (workerGroup != null) {
this.workerGroup = null;
workerGroup.shutdownGracefully().syncUninterruptibly();
}
EventLoopGroup bossGroup = this.bossGroup;
if (bossGroup != null) {
this.bossGroup = null;
bossGroup.shutdownGracefully().syncUninterruptibly();
}
```
Alternatively and better - `unbind()` should return a promise with all the
combined promises so that the caller can `sync()` if required.
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
##########
@@ -85,54 +88,61 @@ public synchronized void bind() throws Exception {
throw new RuntimeException("Please specify at least on
socketaddress to which the server should get bound!");
}
- bootstrap = new ServerBootstrap(createSocketChannelFactory());
- ChannelPipelineFactory factory = createPipelineFactory(channels);
-
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.channel(NioServerSocketChannel.class);
+
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup(ioWorker);
+
+ bootstrap.group(bossGroup, workerGroup);
+
+ ChannelInitializer<SocketChannel> factory =
createPipelineFactory(channels);
+
// Configure the pipeline factory.
- bootstrap.setPipelineFactory(factory);
- configureBootstrap(bootstrap);
+ bootstrap.childHandler(factory);
for (InetSocketAddress address : addresses) {
- channels.add(bootstrap.bind(address));
+ Channel channel = bootstrap.bind(address).sync().channel();
+ channels.add(channel);
}
- started = true;
+ configureBootstrap(bootstrap);
+
+ started = true;
}
/**
* Configure the bootstrap before it get bound
*/
protected void configureBootstrap(ServerBootstrap bootstrap) {
// Bind and start to accept incoming connections.
- bootstrap.setOption("backlog", backlog);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("child.tcpNoDelay", true);
- }
-
- protected ServerSocketChannelFactory createSocketChannelFactory() {
- return new NioServerSocketChannelFactory(createBossExecutor(),
createWorkerExecutor(), ioWorker);
+ bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
+ bootstrap.option(ChannelOption.SO_REUSEADDR, true);
+ bootstrap.option(ChannelOption.TCP_NODELAY, true);
}
-
@Override
public synchronized void unbind() {
if (started == false) {
return;
}
- ChannelPipelineFactory factory = bootstrap.getPipelineFactory();
Review comment:
Forgot to close the channel groups!
```
ChannelGroup channels = this.channels;
if (channels != null) {
this.channels = channels;
channels.close().syncUninterruptibly();
}
```
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/AbstractAsyncServer.java
##########
@@ -48,11 +50,12 @@
private volatile int timeout = 120;
- private ServerBootstrap bootstrap;
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
private volatile boolean started;
- private final ChannelGroup channels = new DefaultChannelGroup();
+ private final ChannelGroup channels = new
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
Review comment:
'GlobalEventExecutor' states - please note it is not scalable to
schedule large number of tasks to this executor; use a dedicated executor.
Projects like Reactor Netty use `ImmediateEventExecutor.INSTANCE` so I'd
potentially recommend changing to;
```
private final ChannelGroup channels = new
DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
````
This will then just execute actions against the channel group on the current
thread.
I'm not 100% on this - but I'm fairly confident using 'GlobalEventExecutor'
is potentially bad.
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/ChannelGroupHandler.java
##########
@@ -19,31 +19,22 @@
package org.apache.james.protocols.netty;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.group.ChannelGroup;
/**
* Add channels to the channel group after the channel was opened.
*
* This handler is thread-safe and thus can be shared across pipelines
*/
-public final class ChannelGroupHandler extends SimpleChannelUpstreamHandler {
[email protected]
+public final class ChannelGroupHandler extends ChannelInboundHandlerAdapter {
Review comment:
Delete this class! Doesn't do anything in Netty 4!
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/TimeoutHandler.java
##########
@@ -18,26 +18,27 @@
****************************************************************/
package org.apache.james.protocols.netty;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.timeout.IdleState;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.util.Timer;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
/**
* {@link IdleStateHandler} implementation which disconnect the {@link
Channel} after a configured
* idle timeout. Be aware that this handle is not thread safe so it can't be
shared across pipelines
*/
[email protected]
public class TimeoutHandler extends IdleStateHandler {
Review comment:
Redundant implementation - just use the Netty provided implementation -
`io.netty.handler.timeout.ReadTimeoutHandler` - and delete this class!
##########
File path:
protocols/netty/src/main/java/org/apache/james/protocols/netty/AllButStartTlsLineBasedChannelHandler.java
##########
@@ -23,44 +23,49 @@
import java.util.Locale;
import org.apache.james.protocols.api.CommandDetectionSession;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.handler.codec.frame.LineBasedFrameDecoder;
import com.google.common.base.CharMatcher;
import com.google.common.base.Splitter;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.util.AttributeKey;
+
+
public class AllButStartTlsLineBasedChannelHandler extends
LineBasedFrameDecoder {
private static final Boolean FAIL_FAST = true;
private final ChannelPipeline pipeline;
private final String pattern;
+ private static final AttributeKey<CommandDetectionSession>
sessionAttributeKey =
+ AttributeKey.valueOf("session");
Review comment:
Same key name as `BasicChannelUpstreamHandler` - consider calling
something different to avoid any potential issues if added to same channel.
Most netty code I've seen would use
`CommandDetectionSession.class.getSimpleName()` or
`CommandDetectionSession.class.getName()` as the key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]