I'm new to MINA and have been working on a small project for a few weeks now. 
My application is fairly simple… I'm sending commands to a server via POJO 
messages. In some cases the server or the client will send a file instead of a 
POJO. I'm using the StreamWriteFilter and the ObjectSerializationCodec 
ProtocolCodecFilter. I can send objects back and forth fairly successfully, but 
I can't send large files and sometimes even small files fail to be sent.

Here is the exception I get when sending files:

00:46:09] pool-4-thread-4 WARN  [] [] [client.ClientIoSessionHandler] - 
Exception: class org.apache.mina.filter.codec.ProtocolDecoderException State: 
RECV_FILE_WAIT
[00:46:09] pool-4-thread-4 ERROR  [] [] [client.ClientIoSessionHandler] - 
Throwable:
org.apache.mina.filter.codec.ProtocolDecoderException: 
org.apache.mina.core.buffer.BufferDataException: dataLength: 1347093252


Any ideas why am I getting this exception? How should I go about having the 
ObjectSerializationCodec handle the POJOs while StreamWriteFilter  handles the 
files?


And here are the relevant sections of code:

Client.java

package client;

public class Client extends SwingWorker<Void, Void>/*Thread*/ {

    private final static Logger LOGGER = LoggerFactory.getLogger(Client.class);

    private String hostname = "127.0.0.1";
    private int port = 8877;
    private String username = null;
    private String password = null;
    private String computername;
    private FileCmdQueue filecmdqueue = null;
    private File homedir = null;
    private TrayIcon trayicon = null;

    private static final long CONNECT_TIMEOUT = 15 *1000L;
    private static final int CONNECT_RETRIES = 3;

    private Boolean flag = true;        //Send a correct hello or a failed 
hello on connect

    private IoSession session;

    public Client(String hostname, int port, String username, String pwd) {
        this.hostname = hostname;
        this.password = pwd;
        this.username = username;
        this.computername = null;

        if (port < 1024 || port > 65535) {
            LOGGER.error("Invalid port number: " + port);
            this.port = 8877;
        }
        else
            this.port = port;

        try{
            computername=InetAddress.getLocalHost().getHostName();
        }catch (Exception e){
            LOGGER.error("Exception caught = "+ e.getMessage());
            computername = "unknown";
        }

    }

    public void Connect(Boolean flag) throws InterruptedException {

        if (session == null || !session.isConnected()) {
            NioSocketConnector connector = new NioSocketConnector();
            connector.setConnectTimeoutMillis(CONNECT_TIMEOUT);

            connector.getFilterChain().addLast("executor", new 
ExecutorFilter());

            connector.getFilterChain().addLast("logger", new LoggingFilter());

            CodecFactory cf = new CodecFactory();
            cf.setDecoderMaxObjectSize(BUFFER_SIZE);
            connector.getFilterChain().addLast("codec", new CodecFilter(cf));

/*
            ObjectSerializationCodecFactory oscf = new 
ObjectSerializationCodecFactory();
            oscf.setDecoderMaxObjectSize(BUFFER_SIZE);
            connector.getFilterChain().addLast("codec", new CodecFilter(oscf));
*/
            StreamWriteFilter swf = new StreamWriteFilter();
            swf.setWriteBufferSize(BUFFER_SIZE);
            connector.getFilterChain().addLast("file", swf);
            connector.setHandler(new ClientIoSessionHandler(flag, this));

//            connector.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE);
//            connector.getSessionConfig().setReadBufferSize(BUFFER_SIZE);

            for (int i=0;i< CONNECT_RETRIES ; i++ ) {
                try {
                    ConnectFuture future = connector.connect(new 
InetSocketAddress(this.hostname, this.port));
                    future.awaitUninterruptibly();
                    session = future.getSession();
                    break;
                } catch (RuntimeIoException e) {
                    LOGGER.error("Failed to connect.", e);
                    Thread.sleep(5000);
                }
            }
                if (session != null) {
                    session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600);
                    session.setAttribute(HOME_DIR, homedir);
                    session.getCloseFuture().awaitUninterruptibly();
                }
            connector.dispose();
        }
    }

    public void disConnect() {
        if (session != null) {
            session.close(true);
            if (trayicon != null)
                trayicon.setToolTip("Status: Disconnected");
        }
    }

    public Boolean sendMessage(FileCmd.CmdType cmd, Object in) {
        if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) {
            if (cmd == FileCmd.CmdType.LIST) {
                session.setAttribute(SESSION_STATE, SessionState.WAITING);
                RequestDirectoryTreeMessage myRequest = new 
RequestDirectoryTreeMessage();
                myRequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
                session.write(myRequest);
                return true;
            }
            else if (cmd == FileCmd.CmdType.DOWNLOAD) {
                File myfile = (File) in;
                DownloadFileMessage myrequest = new DownloadFileMessage();
                myrequest.setRequest(myfile);
                myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
                session.setAttribute(CURRENT_FILE, myfile);
                session.setAttribute(SESSION_STATE,SessionState.RECV_FILE_WAIT);
                session.write(myrequest);
                return true;
            }
            else if (cmd == FileCmd.CmdType.UPLOAD) {
                File myfile = (File) in;
                session.setAttribute(SESSION_STATE, SessionState.SEND_FILE);
                UploadFileMessage myupload = new UploadFileMessage();
                myupload.setRequest(myfile);
                myupload.setNonce((Long)session.getAttribute(SESSION_NONCE));
                session.setAttribute(CURRENT_FILE, myfile);
                session.write(myupload);
                return true;
            }
            else if (cmd == FileCmd.CmdType.R_CREATE) {
                File myfile = (File) in;
                session.setAttribute(SESSION_STATE, SessionState.WAITING);
                CreateDirectoryMessage myrequest = new CreateDirectoryMessage();
                myrequest.setRequest(myfile);
                myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
                session.setAttribute(CURRENT_FILE, myfile);
                session.write(myrequest);
                return true;
            }
            else if (cmd == FileCmd.CmdType.DELETE) {
                File myfile = (File) in;
                session.setAttribute(SESSION_STATE, SessionState.WAITING);
                DeleteFileMessage myrequest = new DeleteFileMessage();
                myrequest.setRequest(myfile);
                myrequest.setNonce((Long)session.getAttribute(SESSION_NONCE));
                session.setAttribute(CURRENT_FILE, myfile);
                session.write(myrequest);
                return true;
            }
            else
                return false;
        } else
            return false;
    }

    public FileDirTree getTreeMap() {
        if (session.containsAttribute(FILEDIR_TREE))
            return (FileDirTree) session.getAttribute(FILEDIR_TREE);
        else
            return null;
    }

    public void setTreeMap(FileDirTree intree) {
        if (session.containsAttribute(FILEDIR_TREE) && intree != null)
            session.setAttribute(FILEDIR_TREE, intree);
        else
            session.removeAttribute(FILEDIR_TREE);
    }

//    @Override
    protected Void doInBackground() {
        try {
            //TODO: Thread locked... need to move to multi-threaded model
            this.Connect(flag);
            return null;
        } catch (InterruptedException f) {
            LOGGER.error("Connection Interrupted", f);
            return null;
        }
    }
}

-----------------------------------------------------------------
ClientIoSessionHandler.java
public class ClientIoSessionHandler extends IoHandlerAdapter {

    private final Client parent;
    private Boolean flag = true; //Sends a correct announcement or not
    private final static Logger LOGGER = 
LoggerFactory.getLogger(ClientIoSessionHandler.class);
    public ClientIoSessionHandler(Boolean flag, Client parent) {
        this.flag = flag;
        this.parent = parent;
    }

    @Override
    public void sessionOpened(IoSession session) {
        //        ClientAnnounceMessage msg = new ClientAnnounceMessage(65530, 
compname.length(), compname, (String)session.getAttribute(USERNAME), 
(String)session.getAttribute(PASSWORD));
        String method = "sessionOpened";
        session.setAttribute(MSG_COUNTER, 0L);
        ClientAnnounceMessage msg =
            new ClientAnnounceMessage(65530, parent.getComputername().length(), 
parent.getComputername(),
                                      parent.getUsername(), "test123");
        session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT);
        session.setAttribute(USERNAME, parent.getHostname());
        //        session.setAttribute(PASSWORD, parent.getPassword());
        session.setAttribute(PASSWORD, "test123");
        session.setAttribute(CLIENT_CODE, 65530);
        session.setAttribute(CLIENT_NAME, parent.getHostname());
        if (parent.getTrayicon() != null)
            parent.getTrayicon().setToolTip("Status: Connected");
        if (flag)
            session.write(msg);
        else
            session.write("Hello There");
        LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " 
Method: " + method  + " State:  " + session.getAttribute(SESSION_STATE));
    }

    @Override
    public void messageReceived(IoSession session, Object message) {

        String method = "messageReceived";
        if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_INIT) {
            if (message instanceof ServerStdResponseMessage) {
                loggerHelper(session, method, message);
                ServerStdResponseMessage myMsg = 
(ServerStdResponseMessage)message;
                if (myMsg.getResult() == 1) {
                    session.setAttribute(SESSION_NONCE, myMsg.getNonce());
                    session.setAttribute(SESSION_STATE, SessionState.IDLE);
                    LOGGER.info("Connection with the server established. 
Result: " + myMsg.getResult());
                    for (int i = 0; i < 5; i++) {
                        if (parent.sendMessage(FileCmd.CmdType.LIST, null)) {
                            break;
                        }
                    }
                } else {
                    loggerHelper(session, method, message);
                    LOGGER.error("Couldn't initialize connection with server. 
Error: " + myMsg.getResult());
                    session.close(true);
                }
            }
        } else if (session.getAttribute(SESSION_STATE) == SessionState.WAITING) 
{
            if (message instanceof FileDirTree) {
                loggerHelper(session, method, message);
                FileDirTree myTree = (FileDirTree)message;
                session.setAttribute(FILEDIR_TREE, myTree);
                session.setAttribute(SESSION_STATE, SessionState.IDLE);
                LOGGER.info("Received Directory Tree");
            } else if (message instanceof ServerStdResponseMessage) {
                loggerHelper(session, method, message);
                ServerStdResponseMessage myMsg = 
(ServerStdResponseMessage)message;
                session.setAttribute(SESSION_NONCE, myMsg.getNonce());
                session.setAttribute(SESSION_STATE, SessionState.IDLE);
                if (myMsg.getResult() == 1)
                    LOGGER.info("We got an OK response from the server. Result: 
" + myMsg.getResult());
                else
                    loggerHelper(session, method, message);
                    LOGGER.warn("Not the response we were expecting. Result: " 
+ myMsg.getResult());
            }
        } else if (session.getAttribute(SESSION_STATE) == 
SessionState.RECV_FILE) {
            File myfile = (File)session.getAttribute(CURRENT_FILE);

             if (message instanceof IoBuffer) {
//            if (message instanceof FileInputStream) {
                loggerHelper(session, method, message);

                IoBuffer fbuff = (IoBuffer)message;
                InputStream frecv = fbuff.asInputStream();
                FileOutputStream fout;
                try {
                    if (session.containsAttribute(FILE_OUT))
                        fout = (FileOutputStream)session.getAttribute(FILE_OUT);
                    else
                        fout = new FileOutputStream(new 
File(((File)session.getAttribute(HOME_DIR)).getPath().concat(myfile.getPath())),
 false);
                    int i = 0;
                    while ((i = frecv.read()) != -1)
                        fout.write(i);
                    frecv.close();
                    fout.close();
                } catch (FileNotFoundException e) {
                    LOGGER.error("Couldn't write to file: " + myfile.getName() 
+ " Exception: " + e.getMessage());
                    session.removeAttribute(CURRENT_FILE);
                    session.removeAttribute(FILE_OUT);
                    session.setAttribute(SESSION_STATE, SessionState.IDLE);

                } catch (IOException e) {
                    LOGGER.error("Couldn't write to file: " + myfile.getName() 
+ " Exception: " + e.getMessage());
                    session.removeAttribute(CURRENT_FILE);
                    session.removeAttribute(FILE_OUT);
                    session.setAttribute(SESSION_STATE, SessionState.IDLE);

                }
            }
            else if (message instanceof ServerStdResponseMessage) {
                loggerHelper(session, method, message);

                ServerStdResponseMessage myMsg = 
(ServerStdResponseMessage)message;
                session.setAttribute(SESSION_NONCE, myMsg.getNonce());
                LOGGER.error("Expecting file: " + myfile.getPath() +
                             " got standard response message instead - Result: 
" + myMsg.getResult());
                session.removeAttribute(FILE_OUT);
                session.removeAttribute(CURRENT_FILE);
                session.setAttribute(SESSION_STATE, SessionState.IDLE);
            } else {
                 if (session.containsAttribute(FILE_OUT)) {
                     loggerHelper(session, method, message);

                     FileOutputStream fout;
                     fout = (FileOutputStream)session.getAttribute(FILE_OUT);
                    try {
                        fout.close();
                    } catch (IOException e) {
                        LOGGER.error("Couldn't close file: " + 
session.getAttribute(CURRENT_FILE).toString());
                    }
                    session.removeAttribute(CURRENT_FILE);
                    session.removeAttribute(FILE_OUT);
                    session.setAttribute(SESSION_STATE, SessionState.IDLE);
                    LOGGER.info("Didn't get an IOBuffer, so we are assuming end 
of transmission State: " +
                                 session.getAttribute(SESSION_STATE));
                 }
                 else {
                     loggerHelper(session, method, message);

                LOGGER.error("Didn't receive the actual file or response 
message " + myfile.getName() + " Message: " +
                             message.getClass().toString());
                session.removeAttribute(CURRENT_FILE);
                session.setAttribute(SESSION_STATE, SessionState.IDLE);
                 }
            }
        } else if (session.getAttribute(SESSION_STATE) == 
SessionState.SEND_FILE) {
            if (message instanceof ServerStdResponseMessage) {
                loggerHelper(session, method, message);

                ServerStdResponseMessage myMsg = 
(ServerStdResponseMessage)message;
                session.setAttribute(SESSION_NONCE, myMsg.getNonce());
                if (myMsg.getResult() == 1) {
                    FileInputStream fsend = null;
                    try {
                        fsend = new FileInputStream(new File(testfile2));
                        session.setAttribute(FILE_STREAM, fsend);
                        session.write(fsend);
                    } catch (FileNotFoundException e) {
                        session.removeAttribute(CURRENT_FILE);
                        session.setAttribute(SESSION_STATE, SessionState.IDLE);
                    }
                } else {
                    session.removeAttribute("current_file");
                    session.setAttribute(SESSION_STATE, SessionState.IDLE);
                    LOGGER.error("Received a negative upload response from 
server. Result: " + myMsg.getResult());
                }
            }
            else {
                loggerHelper(session, method, message);
            }
        } else if (session.getAttribute(SESSION_STATE) == SessionState.IDLE) {
            if (message instanceof ServerStdResponseMessage) {
                loggerHelper(session, method, message);

                ServerStdResponseMessage myMsg = 
(ServerStdResponseMessage)message;
                session.setAttribute(SESSION_NONCE, myMsg.getNonce());
                session.setAttribute(SESSION_STATE, SessionState.IDLE);
                LOGGER.error("Received response without a request. Result: " + 
myMsg.getResult());
            }
            else {
                loggerHelper(session, method, message);
            }
        }

        //This should never happen
        else {
            loggerHelper(session, method, message);
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) {

        String method = "messageSent";

        //Streams are not closed by the file filter automatically, so we must 
close them here
        if (session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && 
session.containsAttribute(FILE_STREAM)) {

            loggerHelper(session, method, message);

            FileInputStream fsend;
            fsend = (FileInputStream)session.getAttribute(FILE_STREAM);
            try {
                fsend.close();
                session.setAttribute(SESSION_STATE, SessionState.IDLE);
            } catch (IOException e) {
                LOGGER.error("Unable to close stream: " + e.getMessage());
                session.setAttribute(SESSION_STATE, SessionState.IDLE);
            }
        } else if (session.getAttribute(SESSION_STATE) == 
SessionState.RECV_FILE_WAIT) {
            loggerHelper(session, method, message);
            session.setAttribute(SESSION_STATE, SessionState.RECV_FILE);
        }
        else {
            loggerHelper(session, method, message);
        }
    }


    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        LOGGER.info("Session idle... disconnecting.");
        if (parent.getTrayicon() != null)
            parent.getTrayicon().setToolTip("Status: Disconnected");
        session.close(true);
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {

        //If this is a NotSerializableException we should continue and hope for 
the best ;-)
        if (cause.getClass() == ProtocolDecoderException.class || 
cause.getClass() == ProtocolEncoderException.class) {
            LOGGER.warn("Exception: " + cause.getClass().toString() + " State: 
" +
                        session.getAttribute(SESSION_STATE));
            LOGGER.error("Throwable: ", cause);
        }
        else {
            //Let's close the connection since we can't recover
            //          LOGGER.error("Unrecoverable Exception on Session: " + 
session.getAttribute(CLIENT_NAME), cause);
            LOGGER.error("Unrecoverable Exception on Session: " + 
session.getAttribute(CLIENT_NAME) + " Exception: " +
                         cause.getClass().toString());
            LOGGER.error("Stack: " + cause.getMessage());
            LOGGER.error("Throwable", cause);
            if (parent.getTrayicon() != null)
                parent.getTrayicon().setToolTip("Status: Disconnected");
            session.close(true);
        }
    }

    private void loggerHelper(IoSession session, String method, Object message) 
{
        session.setAttribute(MSG_COUNTER, 
(Long)session.getAttribute(MSG_COUNTER) + 1);
                LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) 
+ " Method: " + method + " State:  " + session.getAttribute(SESSION_STATE) + " 
Message Class: " + message.getClass().toString());
    }
}

--------------------------------------------------------
Server.java

public class Server {

    private final static Logger LOGGER = LoggerFactory.getLogger(Server.class);
    private final static UserStore userstore = new UserStore();

    public Server() {
        super();
    }

    public static void main(String[] args) throws Throwable {

        //Default Port and Address
        int SERVER_PORT = 8877;
        String SERVER_ADDRESS = "127.0.0.1";

        NioSocketAcceptor acceptor = new NioSocketAcceptor();


        //Let's keep events from firing at the wrong time or simultaneously
        acceptor.getFilterChain().addLast("executor", new ExecutorFilter());

        //We want to instert a logging filter
        acceptor.getFilterChain().addLast("logger", new LoggingFilter());

        CodecFactory cf = new CodecFactory();
        cf.setDecoderMaxObjectSize(BUFFER_SIZE);
        acceptor.getFilterChain().addLast("codec", new CodecFilter(cf));

        StreamWriteFilter swf = new StreamWriteFilter();
        swf.setWriteBufferSize(BUFFER_SIZE);
        acceptor.getFilterChain().addLast("file", swf);

        acceptor.setHandler(new ServerIoSessionHandler());

//        acceptor.getSessionConfig().setMaxReadBufferSize(BUFFER_SIZE);
//        acceptor.getSessionConfig().setReadBufferSize(BUFFER_SIZE);

        //Did we get non-default host and port arguments
        if (!(args.length < 2 || args.length > 2)) {

            //Assign the port number
            try {
                SERVER_PORT = Integer.parseInt(args[1]);
            } catch (NumberFormatException e) {
                LOGGER.error(e.getMessage());
            }

            //Only allow ports in the user range
            if (SERVER_PORT < 1024 || SERVER_PORT > 65535) {
                LOGGER.error("Invalid port number: " + SERVER_PORT);
                System.exit(1);
            }

            //TODO: Validate the host address
            SERVER_ADDRESS = args[0];

        }

        //Let's bind to our address and port number. Give it 5 tries...
        for (int i=0; i<5; i++)
        try {
            acceptor.bind(new InetSocketAddress(SERVER_ADDRESS, SERVER_PORT));
            break;
        } catch (IOException ioe) {
            // TODO: Add catch code
            LOGGER.error("Server was unable to bind to port: " + SERVER_PORT + 
" Error: " + ioe.getMessage());
            Thread.sleep(1000);
        }

        if (acceptor.isActive())
        //We are done
            LOGGER.info("Server started on address and port: " + SERVER_ADDRESS 
+ ":" + SERVER_PORT);
        else {
            LOGGER.info("Server could not start");
            System.exit(1);
        }
    }
}
----------------------------------------------------------
ServerIoSessionHandler.java

public class ServerIoSessionHandler extends IoHandlerAdapter {

    private final static Logger LOGGER = 
LoggerFactory.getLogger(ServerIoSessionHandler.class);

    @Override
    public void sessionOpened(IoSession session) {

        String method = "sessionOpened";
        session.setAttribute(MSG_COUNTER, 0L);

        //Set our idle time to 600 seconds
        session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 600);

        //Our local directory tree
        //TODO:  This should be getting loaded from storage and not created on 
connect
        FileDirTree localdt = new FileDirTree();
        session.setAttribute(HOME_DIR, "/users/admin/Downloads");
        try {
            localdt.setRootDir(new 
File(session.getAttribute(HOME_DIR).toString()));
            session.setAttribute(FILEDIR_TREE, localdt);
        } catch (InvalidParameterException e) {
            LOGGER.error("Invalid File() - Closing Session", e);
            session.close(true);
            } catch (InvalidObjectException e) {
                LOGGER.error("Invalid FileDirTree.setRoot() - Closing Session", 
e);
                session.close(true);
            }

        //Let's keep a session nonce
        session.setAttribute(SESSION_NONCE, generateNonce());

        //Initialize our state
        session.setAttribute(SESSION_STATE, SessionState.WAIT_INIT);

        LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) + " 
Method: " + method  + " State:  " + session.getAttribute(SESSION_STATE));
    }

    @Override
    public void messageReceived(IoSession session, Object message) {

        String method = "messageReceived";

        //TODO: Add busy state with appropriate client response
        if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_INIT) {
            if (message instanceof ClientAnnounceMessage) {
                loggerHelper(session, method, message);

                ClientAnnounceMessage myClient = (ClientAnnounceMessage) 
message;
                session.setAttribute(CLIENT_CODE, myClient.getClient_codes());
                session.setAttribute(CLIENT_NAME, myClient.getClient_name());
                session.setAttribute(USERNAME, myClient.getUsername());
                if (true) {
                    sendStdResponseMessage(session, true, null);
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
                }
                else {
                    LOGGER.info("Unable to authenticate user: " + 
session.getAttribute(USERNAME));
                    session.close(true);
                }
            }
            else {
                loggerHelper(session, method, message);
                sendStdResponseMessage(session, false, "Expected 
ClientAnnounceMessage, got: " + message.getClass().toString());
            }
        }
        else if (session.getAttribute(SESSION_STATE) == SessionState.WAIT_CMD) {
            if (message instanceof RequestDirectoryTreeMessage) {
                loggerHelper(session, method, message);
                session.write(session.getAttribute(FILEDIR_TREE));
            }
            else if (message instanceof DownloadFileMessage) {
                loggerHelper(session, method, message);

                session.setAttribute(SESSION_STATE, SessionState.SEND_FILE);
                DownloadFileMessage filerequest = (DownloadFileMessage) message;
                FileDirTree localdt = (FileDirTree) 
session.getAttribute(FILEDIR_TREE);
                if (localdt.getLocalFile(filerequest.getRequest()) != null) {
                    FileInputStream fsend = null;
                    try {
                        fsend = new 
FileInputStream(localdt.getLocalFile(filerequest.getRequest()));
                        session.setAttribute(FILE_STREAM, fsend);
//                        WriteFuture future = session.write(fsend);
//                        future.awaitUninterruptibly();
                        session.write(fsend);
                    } catch (FileNotFoundException e) {
                        sendStdResponseMessage(session, false, "Unable to send 
file: " + filerequest.getRequest().getName()
                                               + "\nException: " + 
e.getMessage());
                        session.setAttribute(SESSION_STATE, 
SessionState.WAIT_CMD);
                    }
                }
                else {
                    sendStdResponseMessage(session, false, "File not found in 
local tree");
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
                }
            }
            else if (message instanceof UploadFileMessage) {
                loggerHelper(session, method, message);

                UploadFileMessage uploadfile = (UploadFileMessage) message;
                if (uploadfile.getRequest() != null && uploadfile.getRequest() 
instanceof File) {
                    session.setAttribute(CURRENT_FILE, uploadfile.getRequest());
                    session.setAttribute(SESSION_STATE, SessionState.RECV_FILE);
                    sendStdResponseMessage(session, true, null);
                else {
                    sendStdResponseMessage(session, false, "Invalid file upload 
message: " + uploadfile.toString());
                }
            }
            else if (message instanceof CreateDirectoryMessage) {
                loggerHelper(session, method, message);

                CreateDirectoryMessage createdir = (CreateDirectoryMessage) 
message;
                if (createdir.getRequest() != null && createdir.getRequest() 
instanceof File) {
                    session.setAttribute(CURRENT_FILE, createdir.getRequest());
                    session.setAttribute(SESSION_STATE, SessionState.BUSY);
                    File f = (File) createdir.getRequest();
                    if (f.exists())
                        sendStdResponseMessage(session, false, "File already 
exists: " + createdir.toString());
                    else if (!f.mkdirs()) {
                        sendStdResponseMessage(session, false, "Unable to 
create directory: " + createdir.toString());
                    }
                    else {
                        sendStdResponseMessage(session, true, "Created 
directory: " + createdir.toString());
                    }
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
                }
                else {
                    sendStdResponseMessage(session, false, "Invalid request 
payload: " + createdir.toString());
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
               }
            }
            else if (message instanceof DeleteFileMessage) {
                loggerHelper(session, method, message);

                DeleteFileMessage deletefile = (DeleteFileMessage) message;
                if (deletefile.getRequest() != null && deletefile.getRequest() 
instanceof File) {
                    session.setAttribute(CURRENT_FILE, deletefile.getRequest());
                    session.setAttribute(SESSION_STATE, SessionState.BUSY);
                    File f = (File) deletefile.getRequest();
                    if (!f.exists())
                        sendStdResponseMessage(session, false, "File doesn't 
exist: " + deletefile.toString());
                    else if (f.isDirectory() && f.listFiles() != null) {
                        sendStdResponseMessage(session, false, "Directory is 
not empty: " + deletefile.toString());
                    }
                    else if (f.delete()) {
                        sendStdResponseMessage(session, true, "Deleted: " + 
deletefile.toString());
                    }
                    else
                        sendStdResponseMessage(session, false, "Could not 
delete: " + deletefile.toString());
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
                }
                else {
                    sendStdResponseMessage(session, false, "Invalid request 
payload: " + deletefile.toString());
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
               }
            }
            else {
                loggerHelper(session, method, message);

                sendStdResponseMessage(session, false, "Command not recognized. 
CMD: " + message.getClass().toString());
            }
        } else if (session.getAttribute(SESSION_STATE) == 
SessionState.RECV_FILE) {
            File myfile = (File)session.getAttribute(CURRENT_FILE);
            if (message instanceof IoBuffer) {
                loggerHelper(session, method, message);
                IoBuffer fbuff = (IoBuffer)message;
                InputStream frecv = fbuff.asInputStream();
                FileOutputStream fout;
                try {
                    fout = new FileOutputStream(new 
File("/users/admin/filetest-srv.txt"), false);
                    int i = 0;
                    while ((i = frecv.read()) != -1)
                        fout.write(i);
                    frecv.close();
                    fout.close();
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
                } catch (FileNotFoundException e) {
                    LOGGER.error("Couldn't write to file: " + myfile.getName() 
+ " Exception: " + e.getMessage());
                    session.removeAttribute(CURRENT_FILE);
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);

                } catch (IOException e) {
                    LOGGER.error("Couldn't write to file: " + myfile.getName() 
+ " Exception: " + e.getMessage());
                    session.removeAttribute(CURRENT_FILE);
                    session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);

                }
            } else {
                loggerHelper(session, method, message);

                LOGGER.error("Didn't receive the actual file or response 
message " + myfile.getName() + " Message: " +
                             message.getClass().toString());
                session.removeAttribute(CURRENT_FILE);
                session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
            }
        }
        //This should never happen
        else {
            loggerHelper(session, method, message);

            sendStdResponseMessage(session, false, "Invalid session state: " + 
session.getAttribute(SESSION_STATE).toString());
        }
    }

    @Override
    public void messageSent(IoSession session, Object message) {

        String method = "messageSent";

        //Streams are not closed by the file filter automatically, so we must 
close them here
        if(session.getAttribute(SESSION_STATE) == SessionState.SEND_FILE && 
session.containsAttribute(FILE_STREAM)) {
            loggerHelper(session, method, message);
            FileInputStream fsend;
            fsend = (FileInputStream) session.getAttribute(FILE_STREAM);
            try {
                fsend.close();
                LOGGER.info("Closed Stream");
            } catch (IOException e) {
                LOGGER.error("Unable to close stream: " + e.getMessage());
            }
            session.removeAttribute(FILE_STREAM);
            session.setAttribute(SESSION_STATE, SessionState.WAIT_CMD);
        }
        else {
            loggerHelper(session, method, message);
        }
    }

    //Generate a nonce
    private long generateNonce() {
        SecureRandom rnd = new SecureRandom();
        return rnd.nextLong();
    }

    //Have we been idel beyond our timeout? Then disconnect
    @Override
    public void sessionIdle(IoSession session, IdleStatus status) {
        LOGGER.info("Session idle... disconnecting.");
        session.close(true);
    }

    //Log any unrecoverable exceptions and close the connection
    @Override
    public void exceptionCaught(IoSession session, Throwable cause) {
        //If this is a NotSerializableException we should continue and hope for 
the best ;-)
        if (cause.getClass() == ProtocolDecoderException.class || 
cause.getClass() == ProtocolEncoderException.class)
            LOGGER.warn("Exception: " + cause.getClass().toString() + " State: 
" + session.getAttribute(SESSION_STATE));
        else {
            //Let's close the connection since we can't recover
//          LOGGER.error("Unrecoverable Exception on Session: " + 
session.getAttribute(CLIENT_NAME), cause);
            LOGGER.error("Unrecoverable Exception on Session: " + 
session.getAttribute(CLIENT_NAME) + " Exception: " + 
cause.getClass().toString());
            session.close(true);
        }
    }

    //Helper method to send standard server response messages
    private void sendStdResponseMessage(IoSession session, Boolean flag, String 
msg) {
        if (session != null) {
            ServerStdResponseMessage ssrm;
            ssrm = new ServerStdResponseMessage();
            ssrm.setNonce((Long)session.getAttribute(SESSION_NONCE) + 1);
            if (flag)
                ssrm.setResult((short) 1);
            else
                ssrm.setResult((short) 0);
            session.write(ssrm);
        }
        if (msg != null)
            LOGGER.error(msg);
    }

    private void loggerHelper(IoSession session, String method, Object message) 
{
        session.setAttribute(MSG_COUNTER, 
(Long)session.getAttribute(MSG_COUNTER) + 1);
                LOGGER.info("MSG Counter: " + session.getAttribute(MSG_COUNTER) 
+ " Method: " + method + " State:  " + session.getAttribute(SESSION_STATE) + " 
Message Class: " + message.getClass().toString());
    }
}

--------------------------------------------------
CodecFactory.java
public class CodecFactory extends ObjectSerializationCodecFactory {

    private final CodecEncoder encoder;
    private final CodecDecoder decoder;

    public CodecFactory() {
        this(Thread.currentThread().getContextClassLoader());
    }

    public CodecFactory(ClassLoader classLoader) {
        encoder = new CodecEncoder();
        decoder = new CodecDecoder(classLoader);
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) {
        return encoder;
    }

    @Override
    public ProtocolDecoder getDecoder(IoSession session) {
        return decoder;
    }


    @Override
  public int getEncoderMaxObjectSize() {
        return encoder.getMaxObjectSize();
    }


    @Override
    public void setEncoderMaxObjectSize(int maxObjectSize) {
        encoder.setMaxObjectSize(maxObjectSize);
    }

    @Override
    public int getDecoderMaxObjectSize() {
        return decoder.getMaxObjectSize();
    }

    @Override
    public void setDecoderMaxObjectSize(int maxObjectSize) {
        decoder.setMaxObjectSize(maxObjectSize);
    }
}

-----------------------------------
CodecFilter.java
public class CodecFilter extends ProtocolCodecFilter {
    public CodecFilter(Class<? extends ProtocolEncoder> class1, Class<? extends 
ProtocolDecoder> class2) {
        super(class1, class2);
    }

    public CodecFilter(ProtocolEncoder protocolEncoder, ProtocolDecoder 
protocolDecoder) {
        super(protocolEncoder, protocolDecoder);
    }

    public CodecFilter(ProtocolCodecFactory protocolCodecFactory) {
        super(protocolCodecFactory);
    }

    @Override
    public void filterWrite(NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest) throws Exception {
        if (writeRequest.getMessage() instanceof FileInputStream)
//        if (session.getAttribute("state") == SessionState.RECV_FILE || 
session.getAttribute("state") == SessionState.SEND_FILE)
            nextFilter.filterWrite(session, writeRequest);
        else
            super.filterWrite(nextFilter, session, writeRequest);
    }
}

---------------------------------
CodecDecoder.java

public class CodecDecoder extends ObjectSerializationDecoder {
    private final ClassLoader classLoader;

    private int maxObjectSize = 1048576;

    public CodecDecoder() {
        this(Thread.currentThread().getContextClassLoader());
    }

    public CodecDecoder(ClassLoader classLoader) {
        if (classLoader == null) {
            throw new IllegalArgumentException("classloader");
        }
        this.classLoader = classLoader;
    }

    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, 
ProtocolDecoderOutput out) throws Exception {
        if (session.getAttribute("state") == SessionState.RECV_FILE || 
session.getAttribute("state") == SessionState.SEND_FILE)
            return false;
        else {
            return super.doDecode(session, in, out);
        }
    }
}


----------------------------------
CodecEncoder.java

public class CodecEncoder extends ObjectSerializationEncoder {
    private int maxObjectSize = Integer.MAX_VALUE;

    public CodecEncoder() {
        super();
    }

    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput 
out) throws Exception {
       if (session.getAttribute("state") == SessionState.RECV_FILE || 
session.getAttribute("state") == SessionState.SEND_FILE)
            return;
        else
            super.encode(session, message, out);
        }
}









Reply via email to