I'm new to MINA and have been working on a small project for a few weeks now. My application is fairly simple… I'm sending commands to a server via POJO messages. In some cases the server or the client will send a file instead of a POJO. I'm using the StreamWriteFilter and the ObjectSerializationCodec ProtocolCodecFilter. I can send objects back and forth fairly successfully, but I can't send large files and sometimes even small files fail to be sent.
Here is the exception I get when sending files: 00:46:09] pool-4-thread-4 WARN [] [] [client.ClientIoSessionHandler] - Exception: class org.apache.mina.filter.codec.ProtocolDecoderException State: RECV_FILE_WAIT [00:46:09] pool-4-thread-4 ERROR [] [] [client.ClientIoSessionHandler] - Throwable: org.apache.mina.filter.codec.ProtocolDecoderException: org.apache.mina.core.buffer.BufferDataException: dataLength: 1347093252 Any ideas why am I getting this exception? How should I go about having the ObjectSerializationCodec handle the POJOs while StreamWriteFilter handles the files? And here are the relevant sections of code: Client.java package client; public class Client extends SwingWorker<Void, Void>/*Thread*/ { private final static Logger LOGGER = LoggerFactory.getLogger(Client.class); private String hostname = "127.0.0.1"; private int port = 8877; private String username = null; private String password = null; private String computername; private FileCmdQueue filecmdqueue = null; private File homedir = null; private TrayIcon trayicon = null; private static final long CONNECT_TIMEOUT = 15 *1000L; private static final int CONNECT_RETRIES = 3; private Boolean flag = true; //Send a correct hello or a failed hello on connect private IoSession session; public Client(String hostname, int port, String username, String pwd) { this.hostname = hostname; this.password = pwd; this.username = username; this.computername = null; if (port < 1024 || port > 65535) { LOGGER.error("Invalid port number: " + port); this.port = 8877; } else this.port = port; try{ computername=InetAddress.getLocalHost().getHostName(); }catch (Exception e){ LOGGER.error("Exception caught = "+ e.getMessage()); computername = "unknown"; } } public void Connect(Boolean flag) throws InterruptedException { if (session == null || !session.isConnected()) { NioSocketConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(CONNECT_TIMEOUT); connector.getFilterChain().addLast("executor", new ExecutorFilter()); connector.getFilterChain().addLast("logger", new LoggingFilter()); CodecFactory cf = new CodecFactory(); cf.setDecoderMaxObjectSize(BUFFER_SIZE); connector.getFilterChain().addLast("codec", new CodecFilter(cf)); /* ObjectSerializationCodecFactory oscf = new ObjectSerializationCodecFactory(); oscf.setDecoderMaxObjectSize(BUFFER_SIZE); connector.getFilterChain().addLast("codec", new CodecFilter(oscf)); */ StreamWriteFilter swf = new StreamWriteFilter(); swf.setWriteBufferSize(BUFFER_SIZE); connector.getFilterChain().addLast("file", swf); connector.setHandler(new ClientIoSessionHandler(flag, this)); // connector.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE); // connector.getSessionConfig().setReadBufferSize(BUFFER_SIZE); for (int i=0;i< CONNECT_RETRIES ; i++ ) { try { ConnectFuture future = connector.connect(new InetSocketAddress(this.hostname, this.port)); future.awaitUninterruptibly(); session = future.getSession(); break; } catch (RuntimeIoException e) { LOGGER.error("Failed to connect.", e); Thread.sleep(5000); } } if (session != null) { session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600); session.setAttribute(HOME_DIR, homedir); session.getCloseFuture().awaitUninterruptibly(); } connector.dispose(); } } public void disConnect() { if (session != null) { session.close(true); if (trayicon != null) trayicon.setToolTip("Status: Disconnected"); } } public Boolean sendMessage(FileCmd.CmdType cmd, Object in) { if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) { if (cmd == FileCmd.CmdType.LIST) { session.setAttribute(SESSION_STATE, SessionState.WAITING); RequestDirectoryTreeMessage myRequest = new RequestDirectoryTreeMessage(); myRequest.setNonce((Long)session.getAttribute(SESSION_NONCE)); session.write(myRequest); return true; } else if (cmd == FileCmd.CmdType.DOWNLOAD) { File myfile = (File) in; DownloadFileMessage myrequest = new DownloadFileMessage(); myrequest.setRequest(myfile); myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE)); session.setAttribute(CURRENT_FILE, myfile); session.setAttribute(SESSION_STATE,SessionState.RECV_FILE_WAIT); session.write(myrequest); return true; } else if (cmd == FileCmd.CmdType.UPLOAD) { File myfile = (File) in; session.setAttribute(SESSION_STATE, SessionState.SEND_FILE); UploadFileMessage myupload = new UploadFileMessage(); myupload.setRequest(myfile); myupload.setNonce((Long)session.getAttribute(SESSION_NONCE)); session.setAttribute(CURRENT_FILE, myfile); session.write(myupload); return true; } else if (cmd == FileCmd.CmdType.R_CREATE) { File myfile = (File) in; session.setAttribute(SESSION_STATE, SessionState.WAITING); CreateDirectoryMessage myrequest = new CreateDirectoryMessage(); myrequest.setRequest(myfile); myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE)); session.setAttribute(CURRENT_FILE, myfile); session.write(myrequest); return true; } else if (cmd == FileCmd.CmdType.DELETE) { File myfile = (File) in; session.setAttribute(SESSION_STATE, SessionState.WAITING); DeleteFileMessage myrequest = new DeleteFileMessage(); myrequest.setRequest(myfile); myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE)); session.setAttribute(CURRENT_FILE, myfile); session.write(myrequest); return true; } else return false; } else return false; } public FileDirTree getTreeMap() { if (session.containsAttribute(FILEDIR_TREE)) return (FileDirTree) session.getAttribute(FILEDIR_TREE); else return null; } public void setTreeMap(FileDirTree intree) { if (session.containsAttribute(FILEDIR_TREE) && intree != null) session.setAttribute(FILEDIR_TREE, intree); else session.removeAttribute(FILEDIR_TREE); } // @Override protected Void doInBackground() { try { //TODO: Thread locked... need to move to multi-threaded model this.Connect(flag); return null; } catch (InterruptedException f) { LOGGER.error("Connection Interrupted", f); return null; } } } ----------------------------------------------------------------- ClientIoSessionHandler.java public class ClientIoSessionHandler extends IoHandlerAdapter { private final Client parent; private Boolean flag = true; //Sends a correct announcement or not private final static Logger LOGGER = LoggerFactory.getLogger(ClientIoSessionHandler.class); public ClientIoSessionHandler(Boolean flag, Client parent) { this.flag = flag; this.parent = parent; } @Override public void sessionOpened(IoSession session) { // ClientAnnounceMessage msg = new ClientAnnounceMessage(65530, compname.length(), compname, (String)session.getAttribute(USERNAME), (String)session.getAttribute(PASSWORD)); String method = "sessionOpened"; session.setAttribute(MSG_COUNTER, 0L); ClientAnnounceMessage msg = new ClientAnnounceMessage(65530, parent.getComputername().length(), parent.getComputername(), parent.getUsername(), "test123"); session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT); session.setAttribute(USERNAME, parent.getHostname()); // session.setAttribute(PASSWORD, parent.getPassword()); session.setAttribute(PASSWORD, "test123"); session.setAttribute(CLIENT_CODE, 65530); session.setAttribute(CLIENT_NAME, parent.getHostname()); if (parent.getTrayicon() != null) parent.getTrayicon().setToolTip("Status: Connected"); if (flag) session.write(msg); else session.write("Hello There"); LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE)); } @Override public void messageReceived(IoSession session, Object message) { String method = "messageReceived"; if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_INIT) { if (message instanceof ServerStdResponseMessage) { loggerHelper(session, method, message); ServerStdResponseMessage myMsg = (ServerStdResponseMessage)message; if (myMsg.getResult() == 1) { session.setAttribute(SESSION_NONCE, myMsg.getNonce()); session.setAttribute(SESSION_STATE, SessionState.IDLE); LOGGER.info("Connection with the server established. Result: " + myMsg.getResult()); for (int i = 0; i < 5; i++) { if (parent.sendMessage(FileCmd.CmdType.LIST, null)) { break; } } } else { loggerHelper(session, method, message); LOGGER.error("Couldn't initialize connection with server. Error: " + myMsg.getResult()); session.close(true); } } } else if (session.getAttribute(SESSION_STATE) == SessionState.WAITING) { if (message instanceof FileDirTree) { loggerHelper(session, method, message); FileDirTree myTree = (FileDirTree)message; session.setAttribute(FILEDIR_TREE, myTree); session.setAttribute(SESSION_STATE, SessionState.IDLE); LOGGER.info("Received Directory Tree"); } else if (message instanceof ServerStdResponseMessage) { loggerHelper(session, method, message); ServerStdResponseMessage myMsg = (ServerStdResponseMessage)message; session.setAttribute(SESSION_NONCE, myMsg.getNonce()); session.setAttribute(SESSION_STATE, SessionState.IDLE); if (myMsg.getResult() == 1) LOGGER.info("We got an OK response from the server. Result: " + myMsg.getResult()); else loggerHelper(session, method, message); LOGGER.warn("Not the response we were expecting. Result: " + myMsg.getResult()); } } else if (session.getAttribute(SESSION_STATE) == SessionState.RECV_FILE) { File myfile = (File)session.getAttribute(CURRENT_FILE); if (message instanceof IoBuffer) { // if (message instanceof FileInputStream) { loggerHelper(session, method, message); IoBuffer fbuff = (IoBuffer)message; InputStream frecv = fbuff.asInputStream(); FileOutputStream fout; try { if (session.containsAttribute(FILE_OUT)) fout = (FileOutputStream)session.getAttribute(FILE_OUT); else fout = new FileOutputStream(new File(((File)session.getAttribute(HOME_DIR)).getPath().concat(myfile.getPath())), false); int i = 0; while ((i = frecv.read()) != -1) fout.write(i); frecv.close(); fout.close(); } catch (FileNotFoundException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.removeAttribute(FILE_OUT); session.setAttribute(SESSION_STATE, SessionState.IDLE); } catch (IOException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.removeAttribute(FILE_OUT); session.setAttribute(SESSION_STATE, SessionState.IDLE); } } else if (message instanceof ServerStdResponseMessage) { loggerHelper(session, method, message); ServerStdResponseMessage myMsg = (ServerStdResponseMessage)message; session.setAttribute(SESSION_NONCE, myMsg.getNonce()); LOGGER.error("Expecting file: " + myfile.getPath() + " got standard response message instead - Result: " + myMsg.getResult()); session.removeAttribute(FILE_OUT); session.removeAttribute(CURRENT_FILE); session.setAttribute(SESSION_STATE, SessionState.IDLE); } else { if (session.containsAttribute(FILE_OUT)) { loggerHelper(session, method, message); FileOutputStream fout; fout = (FileOutputStream)session.getAttribute(FILE_OUT); try { fout.close(); } catch (IOException e) { LOGGER.error("Couldn't close file: " + session.getAttribute(CURRENT_FILE).toString()); } session.removeAttribute(CURRENT_FILE); session.removeAttribute(FILE_OUT); session.setAttribute(SESSION_STATE, SessionState.IDLE); LOGGER.info("Didn't get an IOBuffer, so we are assuming end of transmission State: " + session.getAttribute(SESSION_STATE)); } else { loggerHelper(session, method, message); LOGGER.error("Didn't receive the actual file or response message " + myfile.getName() + " Message: " + message.getClass().toString()); session.removeAttribute(CURRENT_FILE); session.setAttribute(SESSION_STATE, SessionState.IDLE); } } } else if (session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE) { if (message instanceof ServerStdResponseMessage) { loggerHelper(session, method, message); ServerStdResponseMessage myMsg = (ServerStdResponseMessage)message; session.setAttribute(SESSION_NONCE, myMsg.getNonce()); if (myMsg.getResult() == 1) { FileInputStream fsend = null; try { fsend = new FileInputStream(new File(testfile2)); session.setAttribute(FILE_STREAM, fsend); session.write(fsend); } catch (FileNotFoundException e) { session.removeAttribute(CURRENT_FILE); session.setAttribute(SESSION_STATE, SessionState.IDLE); } } else { session.removeAttribute("current_file"); session.setAttribute(SESSION_STATE, SessionState.IDLE); LOGGER.error("Received a negative upload response from server. Result: " + myMsg.getResult()); } } else { loggerHelper(session, method, message); } } else if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) { if (message instanceof ServerStdResponseMessage) { loggerHelper(session, method, message); ServerStdResponseMessage myMsg = (ServerStdResponseMessage)message; session.setAttribute(SESSION_NONCE, myMsg.getNonce()); session.setAttribute(SESSION_STATE, SessionState.IDLE); LOGGER.error("Received response without a request. Result: " + myMsg.getResult()); } else { loggerHelper(session, method, message); } } //This should never happen else { loggerHelper(session, method, message); } } @Override public void messageSent(IoSession session, Object message) { String method = "messageSent"; //Streams are not closed by the file filter automatically, so we must close them here if (session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && session.containsAttribute(FILE_STREAM)) { loggerHelper(session, method, message); FileInputStream fsend; fsend = (FileInputStream)session.getAttribute(FILE_STREAM); try { fsend.close(); session.setAttribute(SESSION_STATE, SessionState.IDLE); } catch (IOException e) { LOGGER.error("Unable to close stream: " + e.getMessage()); session.setAttribute(SESSION_STATE, SessionState.IDLE); } } else if (session.getAttribute(SESSION_STATE) == SessionState.RECV_FILE_WAIT) { loggerHelper(session, method, message); session.setAttribute(SESSION_STATE, SessionState.RECV_FILE); } else { loggerHelper(session, method, message); } } @Override public void sessionIdle(IoSession session, IdleStatus status) { LOGGER.info("Session idle... disconnecting."); if (parent.getTrayicon() != null) parent.getTrayicon().setToolTip("Status: Disconnected"); session.close(true); } @Override public void exceptionCaught(IoSession session, Throwable cause) { //If this is a NotSerializableException we should continue and hope for the best ;-) if (cause.getClass() == ProtocolDecoderException.class || cause.getClass() == ProtocolEncoderException.class) { LOGGER.warn("Exception: " + cause.getClass().toString() + " State: " + session.getAttribute(SESSION_STATE)); LOGGER.error("Throwable: ", cause); } else { //Let's close the connection since we can't recover // LOGGER.error("Unrecoverable Exception on Session: " + session.getAttribute(CLIENT_NAME), cause); LOGGER.error("Unrecoverable Exception on Session: " + session.getAttribute(CLIENT_NAME) + " Exception: " + cause.getClass().toString()); LOGGER.error("Stack: " + cause.getMessage()); LOGGER.error("Throwable", cause); if (parent.getTrayicon() != null) parent.getTrayicon().setToolTip("Status: Disconnected"); session.close(true); } } private void loggerHelper(IoSession session, String method, Object message) { session.setAttribute(MSG_COUNTER, (Long)session.getAttribute(MSG_COUNTER) + 1); LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE) + " Message Class: " + message.getClass().toString()); } } -------------------------------------------------------- Server.java public class Server { private final static Logger LOGGER = LoggerFactory.getLogger(Server.class); private final static UserStore userstore = new UserStore(); public Server() { super(); } public static void main(String[] args) throws Throwable { //Default Port and Address int SERVER_PORT = 8877; String SERVER_ADDRESS = "127.0.0.1"; NioSocketAcceptor acceptor = new NioSocketAcceptor(); //Let's keep events from firing at the wrong time or simultaneously acceptor.getFilterChain().addLast("executor", new ExecutorFilter()); //We want to instert a logging filter acceptor.getFilterChain().addLast("logger", new LoggingFilter()); CodecFactory cf = new CodecFactory(); cf.setDecoderMaxObjectSize(BUFFER_SIZE); acceptor.getFilterChain().addLast("codec", new CodecFilter(cf)); StreamWriteFilter swf = new StreamWriteFilter(); swf.setWriteBufferSize(BUFFER_SIZE); acceptor.getFilterChain().addLast("file", swf); acceptor.setHandler(new ServerIoSessionHandler()); // acceptor.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE); // acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE); //Did we get non-default host and port arguments if (!(args.length < 2 || args.length > 2)) { //Assign the port number try { SERVER_PORT = Integer.parseInt(args[1]); } catch (NumberFormatException e) { LOGGER.error(e.getMessage()); } //Only allow ports in the user range if (SERVER_PORT < 1024 || SERVER_PORT > 65535) { LOGGER.error("Invalid port number: " + SERVER_PORT); System.exit(1); } //TODO: Validate the host address SERVER_ADDRESS = args[0]; } //Let's bind to our address and port number. Give it 5 tries... for (int i=0; i<5; i++) try { acceptor.bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT)); break; } catch (IOException ioe) { // TODO: Add catch code LOGGER.error("Server was unable to bind to port: " + SERVER_PORT + " Error: " + ioe.getMessage()); Thread.sleep(1000); } if (acceptor.isActive()) //We are done LOGGER.info("Server started on address and port: " + SERVER_ADDRESS + ":" + SERVER_PORT); else { LOGGER.info("Server could not start"); System.exit(1); } } } ---------------------------------------------------------- ServerIoSessionHandler.java public class ServerIoSessionHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(ServerIoSessionHandler.class); @Override public void sessionOpened(IoSession session) { String method = "sessionOpened"; session.setAttribute(MSG_COUNTER, 0L); //Set our idle time to 600 seconds session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600); //Our local directory tree //TODO: This should be getting loaded from storage and not created on connect FileDirTree localdt = new FileDirTree(); session.setAttribute(HOME_DIR, "/users/admin/Downloads"); try { localdt.setRootDir(new File(session.getAttribute(HOME_DIR).toString())); session.setAttribute(FILEDIR_TREE, localdt); } catch (InvalidParameterException e) { LOGGER.error("Invalid File() - Closing Session", e); session.close(true); } catch (InvalidObjectException e) { LOGGER.error("Invalid FileDirTree.setRoot() - Closing Session", e); session.close(true); } //Let's keep a session nonce session.setAttribute(SESSION_NONCE, generateNonce()); //Initialize our state session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT); LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE)); } @Override public void messageReceived(IoSession session, Object message) { String method = "messageReceived"; //TODO: Add busy state with appropriate client response if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_INIT) { if (message instanceof ClientAnnounceMessage) { loggerHelper(session, method, message); ClientAnnounceMessage myClient = (ClientAnnounceMessage) message; session.setAttribute(CLIENT_CODE, myClient.getClient_codes()); session.setAttribute(CLIENT_NAME, myClient.getClient_name()); session.setAttribute(USERNAME, myClient.getUsername()); if (true) { sendStdResponseMessage(session, true, null); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } else { LOGGER.info("Unable to authenticate user: " + session.getAttribute(USERNAME)); session.close(true); } } else { loggerHelper(session, method, message); sendStdResponseMessage(session, false, "Expected ClientAnnounceMessage, got: " + message.getClass().toString()); } } else if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_CMD) { if (message instanceof RequestDirectoryTreeMessage) { loggerHelper(session, method, message); session.write(session.getAttribute(FILEDIR_TREE)); } else if (message instanceof DownloadFileMessage) { loggerHelper(session, method, message); session.setAttribute(SESSION_STATE, SessionState.SEND_FILE); DownloadFileMessage filerequest = (DownloadFileMessage) message; FileDirTree localdt = (FileDirTree) session.getAttribute(FILEDIR_TREE); if (localdt.getLocalFile(filerequest.getRequest()) != null) { FileInputStream fsend = null; try { fsend = new FileInputStream(localdt.getLocalFile(filerequest.getRequest())); session.setAttribute(FILE_STREAM, fsend); // WriteFuture future = session.write(fsend); // future.awaitUninterruptibly(); session.write(fsend); } catch (FileNotFoundException e) { sendStdResponseMessage(session, false, "Unable to send file: " + filerequest.getRequest().getName() + "\nException: " + e.getMessage()); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } } else { sendStdResponseMessage(session, false, "File not found in local tree"); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } } else if (message instanceof UploadFileMessage) { loggerHelper(session, method, message); UploadFileMessage uploadfile = (UploadFileMessage) message; if (uploadfile.getRequest() != null && uploadfile.getRequest() instanceof File) { session.setAttribute(CURRENT_FILE, uploadfile.getRequest()); session.setAttribute(SESSION_STATE, SessionState.RECV_FILE); sendStdResponseMessage(session, true, null); else { sendStdResponseMessage(session, false, "Invalid file upload message: " + uploadfile.toString()); } } else if (message instanceof CreateDirectoryMessage) { loggerHelper(session, method, message); CreateDirectoryMessage createdir = (CreateDirectoryMessage) message; if (createdir.getRequest() != null && createdir.getRequest() instanceof File) { session.setAttribute(CURRENT_FILE, createdir.getRequest()); session.setAttribute(SESSION_STATE, SessionState.BUSY); File f = (File) createdir.getRequest(); if (f.exists()) sendStdResponseMessage(session, false, "File already exists: " + createdir.toString()); else if (!f.mkdirs()) { sendStdResponseMessage(session, false, "Unable to create directory: " + createdir.toString()); } else { sendStdResponseMessage(session, true, "Created directory: " + createdir.toString()); } session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } else { sendStdResponseMessage(session, false, "Invalid request payload: " + createdir.toString()); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } } else if (message instanceof DeleteFileMessage) { loggerHelper(session, method, message); DeleteFileMessage deletefile = (DeleteFileMessage) message; if (deletefile.getRequest() != null && deletefile.getRequest() instanceof File) { session.setAttribute(CURRENT_FILE, deletefile.getRequest()); session.setAttribute(SESSION_STATE, SessionState.BUSY); File f = (File) deletefile.getRequest(); if (!f.exists()) sendStdResponseMessage(session, false, "File doesn't exist: " + deletefile.toString()); else if (f.isDirectory() && f.listFiles() != null) { sendStdResponseMessage(session, false, "Directory is not empty: " + deletefile.toString()); } else if (f.delete()) { sendStdResponseMessage(session, true, "Deleted: " + deletefile.toString()); } else sendStdResponseMessage(session, false, "Could not delete: " + deletefile.toString()); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } else { sendStdResponseMessage(session, false, "Invalid request payload: " + deletefile.toString()); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } } else { loggerHelper(session, method, message); sendStdResponseMessage(session, false, "Command not recognized. CMD: " + message.getClass().toString()); } } else if (session.getAttribute(SESSION_STATE) == SessionState.RECV_FILE) { File myfile = (File)session.getAttribute(CURRENT_FILE); if (message instanceof IoBuffer) { loggerHelper(session, method, message); IoBuffer fbuff = (IoBuffer)message; InputStream frecv = fbuff.asInputStream(); FileOutputStream fout; try { fout = new FileOutputStream(new File("/users/admin/filetest-srv.txt"), false); int i = 0; while ((i = frecv.read()) != -1) fout.write(i); frecv.close(); fout.close(); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } catch (FileNotFoundException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } catch (IOException e) { LOGGER.error("Couldn't write to file: " + myfile.getName() + " Exception: " + e.getMessage()); session.removeAttribute(CURRENT_FILE); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } } else { loggerHelper(session, method, message); LOGGER.error("Didn't receive the actual file or response message " + myfile.getName() + " Message: " + message.getClass().toString()); session.removeAttribute(CURRENT_FILE); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } } //This should never happen else { loggerHelper(session, method, message); sendStdResponseMessage(session, false, "Invalid session state: " + session.getAttribute(SESSION_STATE).toString()); } } @Override public void messageSent(IoSession session, Object message) { String method = "messageSent"; //Streams are not closed by the file filter automatically, so we must close them here if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && session.containsAttribute(FILE_STREAM)) { loggerHelper(session, method, message); FileInputStream fsend; fsend = (FileInputStream) session.getAttribute(FILE_STREAM); try { fsend.close(); LOGGER.info("Closed Stream"); } catch (IOException e) { LOGGER.error("Unable to close stream: " + e.getMessage()); } session.removeAttribute(FILE_STREAM); session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD); } else { loggerHelper(session, method, message); } } //Generate a nonce private long generateNonce() { SecureRandom rnd = new SecureRandom(); return rnd.nextLong(); } //Have we been idel beyond our timeout? Then disconnect @Override public void sessionIdle(IoSession session, IdleStatus status) { LOGGER.info("Session idle... disconnecting."); session.close(true); } //Log any unrecoverable exceptions and close the connection @Override public void exceptionCaught(IoSession session, Throwable cause) { //If this is a NotSerializableException we should continue and hope for the best ;-) if (cause.getClass() == ProtocolDecoderException.class || cause.getClass() == ProtocolEncoderException.class) LOGGER.warn("Exception: " + cause.getClass().toString() + " State: " + session.getAttribute(SESSION_STATE)); else { //Let's close the connection since we can't recover // LOGGER.error("Unrecoverable Exception on Session: " + session.getAttribute(CLIENT_NAME), cause); LOGGER.error("Unrecoverable Exception on Session: " + session.getAttribute(CLIENT_NAME) + " Exception: " + cause.getClass().toString()); session.close(true); } } //Helper method to send standard server response messages private void sendStdResponseMessage(IoSession session, Boolean flag, String msg) { if (session != null) { ServerStdResponseMessage ssrm; ssrm = new ServerStdResponseMessage(); ssrm.setNonce((Long)session.getAttribute(SESSION_NONCE) + 1); if (flag) ssrm.setResult((short) 1); else ssrm.setResult((short) 0); session.write(ssrm); } if (msg != null) LOGGER.error(msg); } private void loggerHelper(IoSession session, String method, Object message) { session.setAttribute(MSG_COUNTER, (Long)session.getAttribute(MSG_COUNTER) + 1); LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " Method: " + method + " State: " + session.getAttribute(SESSION_STATE) + " Message Class: " + message.getClass().toString()); } } -------------------------------------------------- CodecFactory.java public class CodecFactory extends ObjectSerializationCodecFactory { private final CodecEncoder encoder; private final CodecDecoder decoder; public CodecFactory() { this(Thread.currentThread().getContextClassLoader()); } public CodecFactory(ClassLoader classLoader) { encoder = new CodecEncoder(); decoder = new CodecDecoder(classLoader); } @Override public ProtocolEncoder getEncoder(IoSession session) { return encoder; } @Override public ProtocolDecoder getDecoder(IoSession session) { return decoder; } @Override public int getEncoderMaxObjectSize() { return encoder.getMaxObjectSize(); } @Override public void setEncoderMaxObjectSize(int maxObjectSize) { encoder.setMaxObjectSize(maxObjectSize); } @Override public int getDecoderMaxObjectSize() { return decoder.getMaxObjectSize(); } @Override public void setDecoderMaxObjectSize(int maxObjectSize) { decoder.setMaxObjectSize(maxObjectSize); } } ----------------------------------- CodecFilter.java public class CodecFilter extends ProtocolCodecFilter { public CodecFilter(Class<? extends ProtocolEncoder> class1, Class<? extends ProtocolDecoder> class2) { super(class1, class2); } public CodecFilter(ProtocolEncoder protocolEncoder, ProtocolDecoder protocolDecoder) { super(protocolEncoder, protocolDecoder); } public CodecFilter(ProtocolCodecFactory protocolCodecFactory) { super(protocolCodecFactory); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (writeRequest.getMessage() instanceof FileInputStream) // if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) nextFilter.filterWrite(session, writeRequest); else super.filterWrite(nextFilter, session, writeRequest); } } --------------------------------- CodecDecoder.java public class CodecDecoder extends ObjectSerializationDecoder { private final ClassLoader classLoader; private int maxObjectSize = 1048576; public CodecDecoder() { this(Thread.currentThread().getContextClassLoader()); } public CodecDecoder(ClassLoader classLoader) { if (classLoader == null) { throw new IllegalArgumentException("classloader"); } this.classLoader = classLoader; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) return false; else { return super.doDecode(session, in, out); } } } ---------------------------------- CodecEncoder.java public class CodecEncoder extends ObjectSerializationEncoder { private int maxObjectSize = Integer.MAX_VALUE; public CodecEncoder() { super(); } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { if (session.getAttribute("state") == SessionState.RECV_FILE || session.getAttribute("state") == SessionState.SEND_FILE) return; else super.encode(session, message, out); } }