Author: norman Date: Thu May 6 18:11:29 2010 New Revision: 941836 URL: http://svn.apache.org/viewvc?rev=941836&view=rev Log: Add reusable generic implementation code. This is based on NETTY (PROTOCOLS-3)
Added: james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java - copied, changed from r941826, james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java Removed: james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/ProtocolHandlerChainImpl.java Modified: james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java Modified: james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java URL: http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java?rev=941836&r1=941835&r2=941836&view=diff ============================================================================== --- james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java (original) +++ james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractAsyncServer.java Thu May 6 18:11:29 2010 @@ -18,28 +18,9 @@ ****************************************************************/ package org.apache.james.socket.netty; -import java.io.FileInputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.security.KeyStore; import java.util.concurrent.Executors; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; -import javax.annotation.Resource; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.HierarchicalConfiguration; -import org.apache.commons.logging.Log; -import org.apache.james.api.dnsservice.DNSService; -import org.apache.james.lifecycle.Configurable; -import org.apache.james.lifecycle.LogEnabled; -import org.apache.james.services.FileSystem; -import org.apache.james.services.MailServer; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; @@ -48,327 +29,61 @@ import org.jboss.netty.channel.socket.ni * Abstract base class for Servers which want to use async io * */ -public abstract class AbstractAsyncServer implements LogEnabled, Configurable{ - /** - * The default value for the connection backlog. - */ - private static final int DEFAULT_BACKLOG = 200; - - /** - * The default value for the connection timeout. - */ - private static final int DEFAULT_TIMEOUT = 5* 60; +public abstract class AbstractAsyncServer { - /** - * The name of the parameter defining the connection timeout. - */ - private static final String TIMEOUT_NAME = "connectiontimeout"; - /** - * The name of the parameter defining the connection backlog. - */ - private static final String BACKLOG_NAME = "connectionBacklog"; + protected int connPerIP = 0; - /** - * The name of the parameter defining the service hello name. - */ - public static final String HELLO_NAME = "helloName"; - - private FileSystem fileSystem; - - /** - * The internal mail server service. - */ - private MailServer mailServer; - - private Log logger; - - private DNSService dns; - - private boolean enabled; - - protected int connPerIP; + protected int connectionLimit = 0; - private boolean useStartTLS; - private boolean useSSL; - - - protected int connectionLimit; - - private String helloName; - - private String keystore; - - private String secret; - - private int backlog; + private int backlog = 250; - private InetAddress bindTo; - private int port; - private int timeout; + private int timeout = 120; - private SSLContext context; + private ServerBootstrap bootstrap; - private ServerBootstrap bootstrap; + private boolean started; - @Resource(name="dnsserver") - public final void setDNSService(DNSService dns) { - this.dns = dns; - } - - @Resource(name="filesystem") - public final void setFileSystem(FileSystem filesystem) { - this.fileSystem = filesystem; - } - - @Resource(name="James") - public final void setMailServer(MailServer mailServer) { - this.mailServer = mailServer; + private String ip; + + public AbstractAsyncServer(String ip, int port) { + this.ip = ip; + this.port = port; } - - /* - * (non-Javadoc) - * @see org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log) + + /** + * Start the server + * */ - public final void setLog(Log logger) { - this.logger = logger; - } + public synchronized final void start() { + if (started) + throw new IllegalStateException("Server running allready"); - /* - * (non-Javadoc) - * @see org.apache.james.lifecycle.Configurable#configure(org.apache.commons.configuration.HierarchicalConfiguration) - */ - public final void configure(HierarchicalConfiguration config) throws ConfigurationException{ - - Configuration handlerConfiguration = ((HierarchicalConfiguration)config).configurationAt("handler"); - - enabled = config.getBoolean("[...@enabled]", true); - - final Log logger = getLogger(); - if (!enabled) { - logger.info(getServiceType() + " disabled by configuration"); - return; - } - - - /* - boolean streamdump=handlerConfiguration.getChild("streamdump").getAttributeAsBoolean("enabled", false); - String streamdumpDir=streamdump ? handlerConfiguration.getChild("streamdump").getAttribute("directory", null) : null; - setStreamDumpDir(streamdumpDir); - */ - - port = config.getInt("port",getDefaultPort()); - - - - StringBuilder infoBuffer; - - - try { - final String bindAddress = config.getString("bind",null); - if( null != bindAddress ) { - bindTo = InetAddress.getByName(bindAddress); - infoBuffer = - new StringBuilder(64) - .append(getServiceType()) - .append(" bound to: ") - .append(bindTo); - logger.info(infoBuffer.toString()); - } - } - catch( final UnknownHostException unhe ) { - throw new ConfigurationException( "Malformed bind parameter in configuration of service " + getServiceType(), unhe ); - } - - configureHelloName(handlerConfiguration); - - timeout = handlerConfiguration.getInt(TIMEOUT_NAME,DEFAULT_TIMEOUT); - - infoBuffer = - new StringBuilder(64) - .append(getServiceType()) - .append(" handler connection timeout is: ") - .append(timeout); - logger.info(infoBuffer.toString()); - - backlog = config.getInt(BACKLOG_NAME,DEFAULT_BACKLOG); - - infoBuffer = - new StringBuilder(64) - .append(getServiceType()) - .append(" connection backlog is: ") - .append(backlog); - logger.info(infoBuffer.toString()); - - - String connectionLimitString = config.getString("connectionLimit",null); - if (connectionLimitString != null) { - try { - connectionLimit = new Integer(connectionLimitString); - } catch (NumberFormatException nfe) { - logger.error("Connection limit value is not properly formatted.", nfe); - } - if (connectionLimit < 0) { - logger.error("Connection limit value cannot be less than zero."); - throw new ConfigurationException("Connection limit value cannot be less than zero."); - } else if (connectionLimit > 0){ - infoBuffer = new StringBuilder(128) - .append(getServiceType()) - .append(" will allow a maximum of ") - .append(connectionLimitString) - .append(" connections."); - logger.info(infoBuffer.toString()); - } - } - - String connectionLimitPerIP = handlerConfiguration.getString("connectionLimitPerIP",null); - if (connectionLimitPerIP != null) { - try { - connPerIP = new Integer(connectionLimitPerIP).intValue(); - } catch (NumberFormatException nfe) { - logger.error("Connection limit per IP value is not properly formatted.", nfe); - } - if (connPerIP < 0) { - logger.error("Connection limit per IP value cannot be less than zero."); - throw new ConfigurationException("Connection limit value cannot be less than zero."); - } else if (connPerIP > 0){ - infoBuffer = new StringBuilder(128) - .append(getServiceType()) - .append(" will allow a maximum of ") - .append(connPerIP) - .append(" per IP connections for " +getServiceType()); - logger.info(infoBuffer.toString()); - } - } - - - useStartTLS = config.getBoolean("tl...@starttls]", false); - useSSL = config.getBoolean("tl...@sockettls]", false); - - if (useSSL && useStartTLS) throw new ConfigurationException("startTLS is only supported when using plain sockets"); - - if (useStartTLS || useSSL) { - keystore = config.getString("tls.keystore", null); - if (keystore == null) { - throw new ConfigurationException("keystore needs to get configured"); - } - secret = config.getString("tls.secret",""); - } - - doConfigure(config); + bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); + // Configure the pipeline factory. + bootstrap.setPipelineFactory(createPipelineFactory()); - } - - - @PostConstruct - public final void init() throws Exception { - if (isEnabled()) { - preInit(); - buildSSLContext(); - - bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), - Executors.newCachedThreadPool())); - // Configure the pipeline factory. - bootstrap.setPipelineFactory(createPipelineFactory()); - - // Bind and start to accept incoming connections. - bootstrap.setOption("backlog",backlog); - bootstrap.setOption("reuseAddress",true); - - //acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, timeout ); - //acceptor.setHandler(createIoHandler()); - bootstrap.bind(new InetSocketAddress(bindTo,port)); - } - } + // Bind and start to accept incoming connections. + bootstrap.setOption("backlog", backlog); + bootstrap.setOption("reuseAddress", true); + + bootstrap.bind(new InetSocketAddress(ip, port)); + started = true; - @PreDestroy - public final void destroy() { - getLogger().info("Dispose " + getServiceType()); - - bootstrap.releaseExternalResources(); - } - - - /** - * This method is called on init of the Server. Subclasses should override this method to init stuff - * - * @throws Exception - */ - protected void preInit() throws Exception { - // override me - } - - protected void doConfigure(HierarchicalConfiguration config) throws ConfigurationException { - // override me } /** - * Return the DNSService - * - * @return dns + * Stop the server */ - protected DNSService getDNSService() { - return dns; + public synchronized final void stop() { + bootstrap.releaseExternalResources(); + started = false; } - /** - * Return the MailServer - * - * @return mailServer - */ - protected MailServer getMailServer() { - return mailServer; - } - /** - * Return the FileSystem - * - * @return fileSystem - */ - protected FileSystem getFileSystem() { - return fileSystem; - } - - /** - * Configure the helloName for the given Configuration - * - * @param handlerConfiguration - */ - private void configureHelloName(Configuration handlerConfiguration) { - StringBuilder infoBuffer; - String hostName = null; - try { - hostName = dns.getHostName(dns.getLocalHost()); - } catch (UnknownHostException ue) { - hostName = "localhost"; - } - - infoBuffer = - new StringBuilder(64) - .append(getServiceType()) - .append(" is running on: ") - .append(hostName); - getLogger().info(infoBuffer.toString()); - - boolean autodetect = handlerConfiguration.getBoolean(HELLO_NAME + "/[...@autodetect]", true); - if (autodetect) { - helloName = hostName; - } else { - // Should we use the defaultdomain here ? - helloName = handlerConfiguration.getString(HELLO_NAME + "/localhost"); - } - - infoBuffer = - new StringBuilder(64) - .append(getServiceType()) - .append(" handler hello name is: ") - .append(helloName); - getLogger().info(infoBuffer.toString()); - } /** * Return the port this server will listen on @@ -379,110 +94,50 @@ public abstract class AbstractAsyncServe return port; } - /** - * Return the logger - * - * @return logger - */ - protected Log getLogger() { - return logger; - } - /** - * Return if the server is enabled by the configuration - * - * @return enabled - */ - public boolean isEnabled() { - return enabled; - } /** - * Return helloName for this server + * Create ChannelPipelineFactory to use by this Server implementation * - * @return helloName + * @return factory */ - public String getHelloName() { - return helloName; - } - - - /** - * Return if startTLS is supported by this server - * - * @return startTlsSupported - */ - protected boolean isStartTLSSupported() { - return useStartTLS; - } + protected abstract ChannelPipelineFactory createPipelineFactory(); /** - * Return if the socket is using SSL - * - * @return useSSL - */ - protected boolean isSSLSocket() { - return useSSL; - } - - /** - * Build the SSLEngine + * Set the read/write timeout for the server. This will throw a {...@link IllegalStateException} if the + * server is running. * - * @throws Exception + * @param timeout */ - - private void buildSSLContext() throws Exception { - if (useStartTLS) { - String algorithm = "SunX509"; - KeyStore ks = KeyStore.getInstance("JKS"); - ks.load(new FileInputStream(fileSystem.getFile(keystore)), secret.toCharArray()); - - // Set up key manager factory to use our key store - KeyManagerFactory kmf = KeyManagerFactory.getInstance(algorithm); - kmf.init(ks, secret.toCharArray()); - - // Initialize the SSLContext to work with our key managers. - context = SSLContext.getInstance("TLS"); - context.init(kmf.getKeyManagers(), null, null); - - - } + public synchronized void setTimeout(int timeout) { + if (started) throw new IllegalStateException("Can only be set when the server is not running"); + this.timeout = timeout; } - - - /** - * Createh IoHandler to use by this Server implementation - * - * @return ioHandler - */ - protected abstract ChannelPipelineFactory createPipelineFactory(); /** - * Return the SslContextFactory which was created for this service. - * - * @return contextFactory + * Set the Backlog for the socket. This will throw a {...@link IllegalStateException} if the server is running. + * @param backlog */ + public synchronized void setBacklog(int backlog) { + if (started) throw new IllegalStateException("Can only be set when the server is not running"); + this.backlog = backlog; + } /** - * Return the default port which will get used for this server if non is specify in the configuration + * Return the backlog for the socket * - * @return port + * @return backlog */ - protected abstract int getDefaultPort(); + public int getBacklog() { + return backlog; + } /** - * Return textual representation of the service this server provide - * - * @return serviceType + * Return the read/write timeout for the socket. + * @return */ - protected abstract String getServiceType(); - - protected int getTimeout() { + public int getTimeout() { return timeout; } - - protected SSLContext getSSLContext() { - return context; - } } Modified: james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java URL: http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java?rev=941836&r1=941835&r2=941836&view=diff ============================================================================== --- james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java (original) +++ james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelPipelineFactory.java Thu May 6 18:11:29 2010 @@ -32,7 +32,6 @@ import org.jboss.netty.handler.connectio import org.jboss.netty.handler.connection.ConnectionPerIpLimitUpstreamHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timer; /** * Abstract base class for {...@link ChannelPipelineFactory} implementations @@ -42,13 +41,19 @@ import org.jboss.netty.util.Timer; public abstract class AbstractChannelPipelineFactory implements ChannelPipelineFactory{ public final static int MAX_LINE_LENGTH = 8192; - private final Timer timer = new HashedWheelTimer(); private final ConnectionLimitUpstreamHandler connectionLimitHandler; private final ConnectionPerIpLimitUpstreamHandler connectionPerIpLimitHandler; + private TimeoutHandler timeoutHandler; + + public AbstractChannelPipelineFactory(int timeout, int maxConnections, int maxConnectsPerIp) { + timeoutHandler = new TimeoutHandler(new HashedWheelTimer(), timeout, timeout, 0); + connectionLimitHandler = new ConnectionLimitUpstreamHandler(maxConnections); + connectionPerIpLimitHandler = new ConnectionPerIpLimitUpstreamHandler(maxConnectsPerIp); + } + public AbstractChannelPipelineFactory() { - connectionLimitHandler = new ConnectionLimitUpstreamHandler(getMaxConnections()); - connectionPerIpLimitHandler = new ConnectionPerIpLimitUpstreamHandler(getMaxConnectionsPerIP()); + this(120, 0, 0); } /* * (non-Javadoc) @@ -70,7 +75,7 @@ public abstract class AbstractChannelPip pipeline.addLast("encoderResponse", createEncoder()); pipeline.addLast("streamer", new ChunkedWriteHandler()); - pipeline.addLast("timeoutHandler", new TimeoutHandler(timer, 120, 120, 0)); + pipeline.addLast("timeoutHandler", timeoutHandler); pipeline.addLast("coreHandler", createHandler()); @@ -91,27 +96,6 @@ public abstract class AbstractChannelPip */ protected abstract OneToOneEncoder createEncoder(); - /** - * Return the timeout in seconds - * - * @return timeout - */ - protected abstract int getTimeout(); - - - /** - * Return the max connections - * - * @return max connections - */ - protected abstract int getMaxConnections(); - - /** - * Return the max connections per ip - * - * @return max connections per ip - */ - protected abstract int getMaxConnectionsPerIP(); } Modified: james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java URL: http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java?rev=941836&r1=941835&r2=941836&view=diff ============================================================================== --- james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java (original) +++ james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractChannelUpstreamHandler.java Thu May 6 18:11:29 2010 @@ -61,6 +61,9 @@ public abstract class AbstractChannelUps + /** + * Call the {...@link ConnectHandler} instances which are stored in the {...@link ProtocolHandlerChain} + */ @SuppressWarnings("unchecked") @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { @@ -76,6 +79,9 @@ public abstract class AbstractChannelUps + /** + * Call the {...@link LineHandler} + */ @SuppressWarnings("unchecked") @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { @@ -111,7 +117,12 @@ public abstract class AbstractChannelUps } } - private void cleanup(Channel channel) { + /** + * Cleanup the channel + * + * @param channel + */ + protected void cleanup(Channel channel) { ProtocolSession session = (ProtocolSession) attributes.get(channel); if (session != null) { session.resetState(); Copied: james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java (from r941826, james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java) URL: http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java?p2=james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java&p1=james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java&r1=941826&r2=941836&rev=941836&view=diff ============================================================================== --- james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractNettySession.java (original) +++ james/protocols/trunk/impl/src/main/java/org/apache/james/socket/netty/AbstractSession.java Thu May 6 18:11:29 2010 @@ -38,7 +38,7 @@ import org.jboss.netty.handler.stream.Ch * * */ -public abstract class AbstractNettySession implements TLSSupportedSession { +public abstract class AbstractSession implements TLSSupportedSession { protected ChannelHandlerContext handlerContext; protected InetSocketAddress socketAddress; @@ -46,14 +46,14 @@ public abstract class AbstractNettySessi protected SSLEngine engine; protected String user; - public AbstractNettySession(Log logger, ChannelHandlerContext handlerContext, SSLEngine engine) { + public AbstractSession(Log logger, ChannelHandlerContext handlerContext, SSLEngine engine) { this.handlerContext = handlerContext; this.socketAddress = (InetSocketAddress) handlerContext.getChannel().getRemoteAddress(); this.logger = logger; this.engine = engine; } - public AbstractNettySession(Log logger, ChannelHandlerContext handlerContext) { + public AbstractSession(Log logger, ChannelHandlerContext handlerContext) { this(logger, handlerContext, null); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org