Author: norman
Date: Sun Sep 18 11:27:27 2011
New Revision: 1172255
URL: http://svn.apache.org/viewvc?rev=1172255&view=rev
Log:
Merge AbstractChannelUpstreamHandler with BasicChannelUpstreamHandler as there
is no need to have both
Removed:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/AbstractChannelUpstreamHandler.java
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/BasicChannelUpstreamHandler.java
Modified:
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/BasicChannelUpstreamHandler.java
URL:
http://svn.apache.org/viewvc/james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/BasicChannelUpstreamHandler.java?rev=1172255&r1=1172254&r2=1172255&view=diff
==============================================================================
---
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/BasicChannelUpstreamHandler.java
(original)
+++
james/protocols/trunk/impl/src/main/java/org/apache/james/protocols/impl/BasicChannelUpstreamHandler.java
Sun Sep 18 11:27:27 2011
@@ -18,22 +18,33 @@
****************************************************************/
package org.apache.james.protocols.impl;
+import java.util.LinkedList;
+import java.util.List;
+
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.apache.james.protocols.api.ConnectHandler;
+import org.apache.james.protocols.api.ConnectHandlerResultHandler;
+import org.apache.james.protocols.api.DisconnectHandler;
+import org.apache.james.protocols.api.LineHandler;
+import org.apache.james.protocols.api.LineHandlerResultHandler;
import org.apache.james.protocols.api.ProtocolHandlerChain;
import org.apache.james.protocols.api.ProtocolSession;
import org.apache.james.protocols.api.ProtocolSessionFactory;
import org.apache.james.protocols.api.Response;
-import org.apache.james.protocols.impl.AbstractChannelUpstreamHandler;
import org.apache.james.protocols.impl.NettyProtocolTransport;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.slf4j.Logger;
@@ -41,25 +52,145 @@ import org.slf4j.Logger;
* {@link ChannelUpstreamHandler} which is used by the SMTPServer and other
line based protocols
*/
@Sharable
-public class BasicChannelUpstreamHandler extends
AbstractChannelUpstreamHandler {
+public class BasicChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
protected final Logger logger;
protected final SSLContext context;
protected String[] enabledCipherSuites;
protected ProtocolSessionFactory sessionFactory;
+ protected ProtocolHandlerChain chain;
public BasicChannelUpstreamHandler(ProtocolHandlerChain chain,
ProtocolSessionFactory sessionFactory, Logger logger) {
this(chain, sessionFactory, logger, null, null);
}
public BasicChannelUpstreamHandler(ProtocolHandlerChain chain,
ProtocolSessionFactory sessionFactory, Logger logger, SSLContext context,
String[] enabledCipherSuites) {
- super(chain);
+ this.chain = chain;
this.sessionFactory = sessionFactory;
this.logger = logger;
this.context = context;
this.enabledCipherSuites = enabledCipherSuites;
}
+
+ @Override
+ public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
+ ctx.setAttachment(createSession(ctx));
+ super.channelBound(ctx, e);
+ }
+
+
+
+ /**
+ * Call the {@link ConnectHandler} instances which are stored in the
{@link ProtocolHandlerChain}
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent
e) throws Exception {
+ List<ConnectHandler> connectHandlers =
chain.getHandlers(ConnectHandler.class);
+ List<ConnectHandlerResultHandler> resultHandlers =
chain.getHandlers(ConnectHandlerResultHandler.class);
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
+ session.getLogger().info("Connection established from " +
session.getRemoteHost() + " (" + session.getRemoteIPAddress()+ ")");
+ if (connectHandlers != null) {
+ for (int i = 0; i < connectHandlers.size(); i++) {
+ ConnectHandler cHandler = connectHandlers.get(i);
+
+ long start = System.currentTimeMillis();
+ boolean disconnect = connectHandlers.get(i).onConnect(session);
+ long executionTime = System.currentTimeMillis() - start;
+
+ for (int a = 0; a < resultHandlers.size(); a++) {
+ disconnect = resultHandlers.get(a).onResponse(session,
disconnect, executionTime, cHandler);
+ }
+ if (disconnect) {
+ ctx.getChannel().disconnect();
+ break;
+ }
+ }
+ }
+ super.channelConnected(ctx, e);
+ }
+
+
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
+ List<DisconnectHandler> connectHandlers =
chain.getHandlers(DisconnectHandler.class);
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
+ if (connectHandlers != null) {
+ for (int i = 0; i < connectHandlers.size(); i++) {
+ connectHandlers.get(i).onDisconnect(session);
+ }
+ }
+ super.channelDisconnected(ctx, e);
+ }
+
+
+ /**
+ * Call the {@link LineHandler}
+ */
+ @SuppressWarnings("unchecked")
@Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
+ ProtocolSession pSession = (ProtocolSession) ctx.getAttachment();
+ LinkedList<LineHandler> lineHandlers =
chain.getHandlers(LineHandler.class);
+ LinkedList<LineHandlerResultHandler> resultHandlers =
chain.getHandlers(LineHandlerResultHandler.class);
+
+
+ if (lineHandlers.size() > 0) {
+
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ byte[] line;
+
+ if (buf.hasArray()) {
+ line = buf.array();
+ } else {
+ // copy the ChannelBuffer to a byte array to process the
LineHandler
+ line = new byte[buf.capacity()];
+ buf.getBytes(0, line);
+ }
+
+ LineHandler lHandler= (LineHandler) lineHandlers.getLast();
+ long start = System.currentTimeMillis();
+ boolean disconnect = lHandler.onLine(pSession,line);
+ long executionTime = System.currentTimeMillis() - start;
+
+ for (int i = 0; i < resultHandlers.size(); i++) {
+ disconnect = resultHandlers.get(i).onResponse(pSession,
disconnect, executionTime, lHandler);
+ }
+ if (disconnect) ctx.getChannel().disconnect();
+
+ }
+
+ super.messageReceived(ctx, e);
+ }
+
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
+ if (session != null) {
+ session.getLogger().info("Connection closed for " +
session.getRemoteHost() + " (" + session.getRemoteIPAddress()+ ")");
+ }
+ cleanup(ctx);
+
+ super.channelClosed(ctx, e);
+ }
+
+ /**
+ * Cleanup the channel
+ *
+ * @param channel
+ */
+ protected void cleanup(ChannelHandlerContext ctx) {
+ ProtocolSession session = (ProtocolSession) ctx.getAttachment();
+ if (session != null) {
+ session.resetState();
+ session = null;
+ }
+ }
+
+
+
protected ProtocolSession createSession(ChannelHandlerContext ctx) throws
Exception {
SSLEngine engine = null;
if (context != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]