I've been able to correct the issues I had... MINA sure needs more documentation ;-)
These classes demonstrate how to bypass a filter based on session state and sending/receiving files. Not elegant, but I should be able to go back to my project now and finish it. Thanks for all the help! Client.java public class Client { private final static Logger LOGGER = LoggerFactory.getLogger(Client.class); // private static final int BUFFER_SIZE = 33554432; //32 MB public static final int BUFFER_SIZE = 16777216; //16 MB // private static final int BUFFER_SIZE = 8388608; //8 MB // private static final int BUFFER_SIZE = 4194304; //4 MB private String hostname = "127.0.0.1"; private int port = 8877; private File filename; private static final long CONNECT_TIMEOUT = 15 * 1000L; private static final int CONNECT_RETRIES = 3; private IoSession session; private int cmd = 0; public Client(String hostname, int port, File infile) { this.hostname = hostname; if (port < 1024 || port > 65535) { LOGGER.error("Invalid port number: " + port); this.port = 8877; } else this.port = port; this.filename = infile; } public void connect() throws InterruptedException { NioSocketConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(CONNECT_TIMEOUT); //Let's keep events from firing at the wrong time or simultaneously connector.getFilterChain().addLast("executor", new ExecutorFilter()); //Out logging filter connector.getFilterChain().addLast("logger", new LoggingFilter()); //Initial attributes filter SessionAttributeInitializingFilter saif = new SessionAttributeInitializingFilter(); saif.setAttribute("state", SessionState.SEND_FILE); connector.getFilterChain().addLast("init", saif); //INFO: Moved file filter after protocol codec CodecFactory cf = new CodecFactory(); cf.setDecoderMaxObjectSize(BUFFER_SIZE); connector.getFilterChain().addLast("codec", new CodecFilter(cf)); //INFO: Updated buffer to 8MB StreamWriteFilter swf = new StreamWriteFilter(); swf.setWriteBufferSize(BUFFER_SIZE); connector.getFilterChain().addLast("file", swf); // ObjectSerializationCodecFactory oscf = new ObjectSerializationCodecFactory(); // oscf.setDecoderMaxObjectSize(BUFFER_SIZE); // connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(oscf)); connector.setHandler(new ClientIoSessionHandler(this)); // connector.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE); // connector.getSessionConfig().setReadBufferSize(BUFFER_SIZE); for (int i = 0; i < CONNECT_RETRIES; i++) { try { ConnectFuture future = connector.connect(new InetSocketAddress(this.hostname, this.port)); future.awaitUninterruptibly(); session = future.getSession(); break; } catch (RuntimeIoException e) { LOGGER.error("Failed to connect.", e); Thread.sleep(5000); } } if (session != null) { session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600); // session.setAttribute(HOME_DIR, homedir); session.getCloseFuture().awaitUninterruptibly(); } connector.dispose(); } public static void main(String[] args) { if (args.length != 4) { System.out.println("Usage: java client hostname port [up/down] filename"); System.exit(1); } Client client = new Client(args[0], Integer.parseInt(args[1]), new File(args[3])); if (args[2].equalsIgnoreCase("up")) client.cmd = 1; else client.cmd = 2; try { client.connect(); } catch (InterruptedException e) { LOGGER.error("Interrupted Exception: ", e); } int i = 0; while (client.session == null && i < 15) { try { System.out.println("Waiting for session..."); Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Couldn't go to sleep..."); } i++; } } public int getCmd() { return cmd; } public File getFilename() { return filename; } } ------------------------------ ClientIOSessionHandler.java public class ClientIoSessionHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(ClientIoSessionHandler.class); public static final String SESSION_NONCE = "nonce"; public static final String SESSION_STATE = "state"; public static final String CLIENT_CODE = "client_code"; public static final String CLIENT_NAME = "client_name"; public static final String FILEDIR_TREE = "filedir_tree"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; public static final String CURRENT_FILE = "current_file"; public static final String FILE_STREAM = "file_stream"; public static final String FILE_OUT = "file_out"; public static final String MSG_COUNTER = "message_counter"; public static final String HOME_DIR = "home_dir"; public static final String NEW_ATTRIBUTE = "new_attribute"; public static final String FILE_REGION = "file_region"; public static final String FILE_POSITION = "file_position"; public static final String FILE_SIZE = "file_size"; private final Client parent; public ClientIoSessionHandler(Client parent) { this.parent = parent; } @Override public void sessionOpened(IoSession session) { String method = "sessionOpened"; session.setAttribute(MSG_COUNTER, 0L); //Set our home directory to something useful session.setAttribute(HOME_DIR, new File("/")); if (parent.getCmd() == 1) { session.setAttribute(SESSION_STATE, SessionState.SEND_FILE); if (parent.getFilename() != null && parent.getFilename().exists()) { session.setAttribute(CURRENT_FILE, parent.getFilename()); FileInputStream fsend = null; try { fsend = new FileInputStream((File)session.getAttribute(CURRENT_FILE)); DefaultFileRegion dfr = new DefaultFileRegion(fsend.getChannel()); session.setAttribute(FILE_STREAM, fsend); session.setAttribute(FILE_REGION, dfr); IoBuffer tbuff = (new SimpleBufferAllocator()).allocate(8, false); tbuff.putLong(dfr.getRemainingBytes()); tbuff.flip(); session.write(tbuff); } catch (FileNotFoundException e) { LOGGER.error("File not found exception. Unable to send file."); session.close(true); } catch (IOException e) { LOGGER.error("IOException in FileRegion. Unable to send file."); session.close(true); } } else { LOGGER.error("File not found. Unable to send file."); session.close(true); } } else if (parent.getCmd() == 2) { session.setAttribute(SESSION_STATE, SessionState.RECV_FILE); session.setAttribute(CURRENT_FILE, parent.getFilename()); session.setAttribute(FILE_POSITION, 0L); } else { LOGGER.error("Invalid parent command"); session.close(true); } LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE)); } @Override public void messageReceived(IoSession session, Object message) { String method = "messageReceived"; //TODO: Add appropriate code to deal with server busy states if (session.getAttribute(SESSION_STATE) == SessionState.RECV_FILE) { File myfile = (File)session.getAttribute(CURRENT_FILE); if (message instanceof IoBuffer) { loggerHelper(session, method, message); IoBuffer fbuff = (IoBuffer)message; if (!session.containsAttribute(FILE_SIZE)) { long fsize = fbuff.getLong(); session.setAttribute(FILE_SIZE, (Long)fsize); } InputStream frecv = fbuff.asInputStream(); FileOutputStream fout; // LOGGER.info("Bytes remaining: " + frecv.getRemainingBytes() + "Filename: ", frecv.getFilename()); long position = 0L; try { if (session.containsAttribute(FILE_OUT)) { fout = (FileOutputStream)session.getAttribute(FILE_OUT); } else { fout = new FileOutputStream(new File(((File)session.getAttribute(HOME_DIR)).getPath().concat(File.separator + myfile.getPath())), false); session.setAttribute(FILE_OUT, fout); } if (session.containsAttribute(FILE_POSITION)) { position = (Long)session.getAttribute(FILE_POSITION); } else { position = 0L; } //TODO: Very slow reading/writing - Fix with ByteArray reading/writing instead /* int i = 0; //TODO: We are closing the stream too fast. Fix while ((i = frecv.read()) != -1) { fout.write(i); position++; } */ position = position + frecv.available(); byte[] readAll = new byte[frecv.available()]; frecv.read(readAll); fout.write(readAll); frecv.close(); session.setAttribute(FILE_POSITION, position); long tsize = (Long)session.getAttribute(FILE_SIZE); // if ((Long)session.getAttribute(FILE_SIZE) == (Long)session.getAttribute(FILE_POSITION)) { if (tsize == position) { fout.close(); session.close(false); } } catch (FileNotFoundException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.removeAttribute(FILE_OUT); session.setAttribute(SESSION_STATE, SessionState.IDLE); } catch (IOException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.removeAttribute(FILE_OUT); session.setAttribute(SESSION_STATE, SessionState.IDLE); } } else { LOGGER.error("In RECV_FILE state and no IoBuffer received"); loggerHelper(session, method, message); session.close(false); } } else if (session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE) { LOGGER.error("In SEND_FILE state, so we should not receive a message"); loggerHelper(session, method, message); session.close(false); } else if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) { LOGGER.error("In IDLE state, so we should not receive a message"); loggerHelper(session, method, message); session.close(false); } //This should never happen else { LOGGER.error("Received a message with no valid STATE"); loggerHelper(session, method, message); session.close(false); } } @Override public void messageSent(IoSession session, Object message) { String method = "messageSent"; //Streams are not closed by the file filter automatically, so we must close them here if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && session.containsAttribute(FILE_REGION)) { loggerHelper(session, method, message); session.write(session.getAttribute(FILE_REGION)); session.removeAttribute(FILE_REGION); } else if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && session.containsAttribute(FILE_STREAM)) { loggerHelper(session, method, message); FileInputStream fsend; fsend = (FileInputStream) session.getAttribute(FILE_STREAM); try { fsend.close(); LOGGER.info("Closed Stream"); } catch (IOException e) { LOGGER.error("Unable to close stream: " + e.getMessage()); } session.removeAttribute(FILE_STREAM); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } else { loggerHelper(session, method, message); } } @Override public void sessionIdle(IoSession session, IdleStatus status) { LOGGER.info("Session idle... disconnecting."); session.close(true); } @Override public void exceptionCaught(IoSession session, Throwable cause) { //Let's close the connection since we can't recover // LOGGER.error("Unrecoverable Exception on Session: " + session.getAttribute(CLIENT_NAME), cause); LOGGER.error("Unrecoverable Exception on Session: " + session.getId() + " Exception: " + cause.getClass().toString()); LOGGER.error("Stack: " + cause.getMessage()); LOGGER.error("Throwable", cause); session.close(true); } private void loggerHelper(IoSession session, String method, Object message) { session.setAttribute(MSG_COUNTER, (Long)session.getAttribute(MSG_COUNTER) + 1); LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE) + " Message Class: " + message.getClass().toString()); } } ------------------- Server.java public class Server { public Server() { super(); } private final static Logger LOGGER = LoggerFactory.getLogger(Server.class); // private static final int BUFFER_SIZE = 33554432; //32 MB public static final int BUFFER_SIZE = 16777216; //16 MB // private static final int BUFFER_SIZE = 8388608; //8 MB // private static final int BUFFER_SIZE = 4194304; //4 MB //Default Port and Address private int SERVER_PORT = 8877; private String SERVER_ADDRESS = "127.0.0.1"; private File filename = null;; private int cmd = 0; public void start() throws InterruptedException { NioSocketAcceptor acceptor = new NioSocketAcceptor(); //Let's keep events from firing at the wrong time or simultaneously acceptor.getFilterChain().addLast("executor", new ExecutorFilter()); //We want to instert a logging filter acceptor.getFilterChain().addLast("logger", new LoggingFilter()); //Initial attributes filter SessionAttributeInitializingFilter saif = new SessionAttributeInitializingFilter(); saif.setAttribute("state", SessionState.SEND_FILE); acceptor.getFilterChain().addLast("init", saif); //INFO: Moved file filter after protocol codec //Let's use object codec serialization until we develop our own codec //INFO: Moved file filter after protocol codec CodecFactory cf = new CodecFactory(); cf.setDecoderMaxObjectSize(BUFFER_SIZE); acceptor.getFilterChain().addLast("codec", new CodecFilter(cf)); //Let's add an filter to deal with File I/O //INFO: Updated buffer to 8MB StreamWriteFilter swf = new StreamWriteFilter(); swf.setWriteBufferSize(BUFFER_SIZE); acceptor.getFilterChain().addLast("file", swf); // ObjectSerializationCodecFactory oscf = new ObjectSerializationCodecFactory(); // oscf.setDecoderMaxObjectSize(BUFFER_SIZE); // acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(oscf)); //Let's set our main IoSession Handler here acceptor.setHandler(new ServerIoSessionHandler(this)); // acceptor.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE); // acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE); //Let's bind to our address and port number. Give it 5 tries... for (int i=0; i<5; i++) try { acceptor.bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT)); break; } catch (IOException ioe) { // TODO: Add catch code LOGGER.error("Server was unable to bind to port: " + SERVER_PORT + " Error: " + ioe.getMessage()); Thread.sleep(1000); } if (acceptor.isActive()) //We are done LOGGER.info("Server started on address and port: " + SERVER_ADDRESS + ":" + SERVER_PORT); else { LOGGER.info("Server could not start"); System.exit(1); } } public static void main(String[] args) { Server server = new Server(); //Did we get non-default host and port arguments if (args.length == 4) { //Assign the port number try { server.SERVER_PORT = Integer.parseInt(args[1]); } catch (NumberFormatException e) { LOGGER.error(e.getMessage()); } //Only allow ports in the user range if (server.SERVER_PORT < 1024 || server.SERVER_PORT > 65535) { LOGGER.error("Invalid port number: " + server.SERVER_PORT); System.exit(1); } //TODO: Validate the host address server.SERVER_ADDRESS = args[0]; if (args[2].equalsIgnoreCase("up")) server.cmd = 1; else server.cmd = 2; try { server.start(); } catch (InterruptedException e) { LOGGER.error("Interrupted Exception: ", e); } server.filename = new File(args[3]); } else { System.out.println("Usage: java server hostname port [up/down] filename"); System.exit(1); } } public int getCmd() { return cmd; } public File getFilename() { return filename; } } -------------------------- ServerIoSessionHandler.java public class ServerIoSessionHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(ServerIoSessionHandler.class); public static final String SESSION_NONCE = "nonce"; public static final String SESSION_STATE = "state"; public static final String CLIENT_CODE = "client_code"; public static final String CLIENT_NAME = "client_name"; public static final String FILEDIR_TREE = "filedir_tree"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; public static final String CURRENT_FILE = "current_file"; public static final String FILE_STREAM = "file_stream"; public static final String FILE_OUT = "file_out"; public static final String MSG_COUNTER = "message_counter"; public static final String HOME_DIR = "home_dir"; public static final String NEW_ATTRIBUTE = "new_attribute"; public static final String FILE_REGION = "file_region"; public static final String FILE_POSITION = "file_position"; public static final String FILE_SIZE = "file_size"; private final Server parent; public ServerIoSessionHandler(Server parent) { this.parent = parent; } @Override public void sessionOpened(IoSession session) { String method = "sessionOpened"; session.setAttribute(MSG_COUNTER, 0L); //Set our home directory to something useful session.setAttribute(HOME_DIR, new File("/")); //Set our idle time to 600 seconds session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600); // //Initialize our state // session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT); // if (parent.getCmd() == 1) { session.setAttribute(SESSION_STATE, SessionState.SEND_FILE); if (parent.getFilename() != null && parent.getFilename().exists()) { session.setAttribute(CURRENT_FILE, parent.getFilename()); FileInputStream fsend = null; try { fsend = new FileInputStream((File)session.getAttribute(CURRENT_FILE)); DefaultFileRegion dfr = new DefaultFileRegion(fsend.getChannel()); session.setAttribute(FILE_STREAM, fsend); session.setAttribute(FILE_REGION, dfr); IoBuffer tbuff = (new SimpleBufferAllocator()).allocate(8, false); tbuff.putLong(dfr.getRemainingBytes()); tbuff.flip(); session.write(tbuff); } catch (FileNotFoundException e) { LOGGER.error("File not found exception. Unable to send file."); session.close(true); } catch (IOException e) { LOGGER.error("IOException in FileRegion. Unable to send file."); session.close(true); } } else { LOGGER.error("File not found. Unable to send file."); session.close(true); } } else if (parent.getCmd() == 2) { session.setAttribute(SESSION_STATE, SessionState.RECV_FILE); session.setAttribute(CURRENT_FILE, parent.getFilename()); session.setAttribute(FILE_POSITION, 0L); } else { LOGGER.error("Invalid parent command"); session.close(true); } LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE)); } @Override public void messageReceived(IoSession session, Object message) { String method = "messageReceived"; //TODO: Add appropriate code to deal with server busy states if (session.getAttribute(SESSION_STATE) == SessionState.RECV_FILE) { File myfile = (File)session.getAttribute(CURRENT_FILE); if (message instanceof IoBuffer) { loggerHelper(session, method, message); IoBuffer fbuff = (IoBuffer)message; if (!session.containsAttribute(FILE_SIZE)) { long fsize = fbuff.getLong(); session.setAttribute(FILE_SIZE, (Long)fsize); } InputStream frecv = fbuff.asInputStream(); FileOutputStream fout; // LOGGER.info("Bytes remaining: " + frecv.getRemainingBytes() + "Filename: ", frecv.getFilename()); long position = 0L; try { if (session.containsAttribute(FILE_OUT)) { fout = (FileOutputStream)session.getAttribute(FILE_OUT); } else { fout = new FileOutputStream(new File(((File)session.getAttribute(HOME_DIR)).getPath().concat(File.separator + myfile.getPath())), false); session.setAttribute(FILE_OUT, fout); } if (session.containsAttribute(FILE_POSITION)) { position = (Long)session.getAttribute(FILE_POSITION); } else { position = 0L; } int i = 0; //TODO: We are closing the stream too fast. Fix //TODO: Very slow reading/writing - Fix with ByteArray reading/writing instead /* int i = 0; //TODO: We are closing the stream too fast. Fix while ((i = frecv.read()) != -1) { fout.write(i); position++; } */ position = position + frecv.available(); byte[] readAll = new byte[frecv.available()]; frecv.read(readAll); fout.write(readAll); frecv.close(); session.setAttribute(FILE_POSITION, position); long tsize = (Long)session.getAttribute(FILE_SIZE); // if ((Long)session.getAttribute(FILE_SIZE) == (Long)session.getAttribute(FILE_POSITION)) { if (tsize == position) { fout.close(); session.close(false); } } catch (FileNotFoundException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.removeAttribute(FILE_OUT); session.setAttribute(SESSION_STATE, SessionState.IDLE); } catch (IOException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.removeAttribute(FILE_OUT); session.setAttribute(SESSION_STATE, SessionState.IDLE); } } else { LOGGER.error("In RECV_FILE state and no IoBuffer received"); loggerHelper(session, method, message); session.close(false); } } else if (session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE) { LOGGER.error("In SEND_FILE state, so we should not receive a message"); loggerHelper(session, method, message); session.close(false); } else if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) { LOGGER.error("In IDLE state, so we should not receive a message"); loggerHelper(session, method, message); session.close(false); } //This should never happen else { LOGGER.error("Received a message with no valid STATE"); loggerHelper(session, method, message); session.close(false); } } @Override public void messageSent(IoSession session, Object message) { String method = "messageSent"; //Streams are not closed by the file filter automatically, so we must close them here if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && session.containsAttribute(FILE_REGION)) { loggerHelper(session, method, message); session.write(session.getAttribute(FILE_REGION)); session.removeAttribute(FILE_REGION); } else if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && session.containsAttribute(FILE_STREAM)) { loggerHelper(session, method, message); FileInputStream fsend; fsend = (FileInputStream) session.getAttribute(FILE_STREAM); try { fsend.close(); LOGGER.info("Closed Stream"); } catch (IOException e) { LOGGER.error("Unable to close stream: " + e.getMessage()); } session.removeAttribute(FILE_STREAM); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } else { loggerHelper(session, method, message); } } //Have we been idel beyond our timeout? Then disconnect @Override public void sessionIdle(IoSession session, IdleStatus status) { LOGGER.info("Session idle... disconnecting."); session.close(true); } //Log any unrecoverable exceptions and close the connection @Override public void exceptionCaught(IoSession session, Throwable cause) { //If this is a NotSerializableException we should continue and hope for the best ;-) if (cause.getClass() == ProtocolDecoderException.class || cause.getClass() == ProtocolEncoderException.class) LOGGER.warn("Exception: " + cause.getClass().toString() + " State: " + session.getAttribute(SESSION_STATE)); else { //Let's close the connection since we can't recover // LOGGER.error("Unrecoverable Exception on Session: " + session.getAttribute(CLIENT_NAME), cause); LOGGER.error("Unrecoverable Exception on Session: " + session.getAttribute(CLIENT_NAME) + " Exception: " + cause.getClass().toString()); session.close(true); } } private void loggerHelper(IoSession session, String method, Object message) { session.setAttribute(MSG_COUNTER, (Long)session.getAttribute(MSG_COUNTER) + 1); LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE) + " Message Class: " + message.getClass().toString()); } } ----------------------- CodecFactory.java public class CodecFactory extends ObjectSerializationCodecFactory { private final CodecEncoder encoder; private final CodecDecoder decoder; public CodecFactory() { this(Thread.currentThread().getContextClassLoader()); } public CodecFactory(ClassLoader classLoader) { encoder = new CodecEncoder(); decoder = new CodecDecoder(classLoader); } @Override public ProtocolEncoder getEncoder(IoSession session) { return encoder; } @Override public ProtocolDecoder getDecoder(IoSession session) { return decoder; } @Override public int getEncoderMaxObjectSize() { return encoder.getMaxObjectSize(); } @Override public void setEncoderMaxObjectSize(int maxObjectSize) { encoder.setMaxObjectSize(maxObjectSize); } @Override public int getDecoderMaxObjectSize() { return decoder.getMaxObjectSize(); } @Override public void setDecoderMaxObjectSize(int maxObjectSize) { decoder.setMaxObjectSize(maxObjectSize); } } ----------------------------- CodecFilter.java public class CodecFilter extends ProtocolCodecFilter { private final static Logger LOGGER = LoggerFactory.getLogger(CodecFilter.class); public CodecFilter(Class<? extends ProtocolEncoder> class1, Class<? extends ProtocolDecoder> class2) { super(class1, class2); } public CodecFilter(ProtocolEncoder protocolEncoder, ProtocolDecoder protocolDecoder) { super(protocolEncoder, protocolDecoder); } public CodecFilter(ProtocolCodecFactory protocolCodecFactory) { super(protocolCodecFactory); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) { LOGGER.info("FilterWrite override successful. State: " + session.getAttribute("state") + " Filter: " + nextFilter.getClass().toString()); nextFilter.filterWrite(session, writeRequest); return; } else { LOGGER.info("SerializationCodec FilterWrite in use. State: " + session.getAttribute("state")); super.filterWrite(nextFilter, session, writeRequest); } } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) { LOGGER.info("MessageReceived override successful. State: " + session.getAttribute("state") + " Filter: " + nextFilter.getClass().toString()); nextFilter.messageReceived(session, message); return; } else { LOGGER.info("SerializationCodec MessageReceived in use. State: " + session.getAttribute("state")); super.messageReceived(nextFilter, session, message); } } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) { LOGGER.info("MessageSent override successful. State: " + session.getAttribute("state") + " Filter: " + nextFilter.getClass().toString()); nextFilter.messageSent(session, writeRequest); return; } else { LOGGER.info("SerializationCodec MessageSent in use. State: " + session.getAttribute("state")); super.messageSent(nextFilter, session, writeRequest); } } } ------------------------------ CodecEncoder.java public class CodecEncoder extends ObjectSerializationEncoder { private final static Logger LOGGER = LoggerFactory.getLogger(CodecEncoder.class); private int maxObjectSize = Integer.MAX_VALUE; public CodecEncoder() { super(); } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) { LOGGER.info("Encoder override successful"); return; } else LOGGER.info("Serialization Encoder in use. State: " + session.getAttribute("state")); super.encode(session, message, out); } } -------------------------- CodecDecoder.java public class CodecDecoder extends ObjectSerializationDecoder { private final static Logger LOGGER = LoggerFactory.getLogger(CodecDecoder.class); private final ClassLoader classLoader; private int maxObjectSize = 1048576; public CodecDecoder() { this(Thread.currentThread().getContextClassLoader()); } public CodecDecoder(ClassLoader classLoader) { if (classLoader == null) { throw new IllegalArgumentException("classloader"); } this.classLoader = classLoader; } /* @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) { LOGGER.info("doDecoder Override"); out.write(in); return true; } else { LOGGER.info("Serialization doDecoder in use. State: " + session.getAttribute("state")); return super.doDecode(session, in, out); } } */ @Override public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) { LOGGER.info("Decoder Override"); return; } else { LOGGER.info("Serialization Decoder in use. State: " + session.getAttribute("state")); super.decode(session, in, out); } } }