It seems my attached test case is not displayed, here is it...
public class TestCompressor {
private static final InetSocketAddress LOCAL_ADDRESS = new
InetSocketAddress(5000);
private IoAcceptor acceptor;
private IoConnector connector;
private List<String> receivedMessages;
@Before
public void setUp() throws IOException {
connector = new NioSocketConnector();
connector.getFilterChain().addLast("compression", new CompressionFilter());
connector.getFilterChain().addLast("text", new ProtocolCodecFilter(new
TextLineCodecFactory()));
connector.setHandler(new IoHandlerAdapter());
acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast("compression", new CompressionFilter());
acceptor.getFilterChain().addLast("text", new ProtocolCodecFilter(new
TextLineCodecFactory()));
acceptor.setHandler(new IoHandlerAdapter() {
@Override
public void messageReceived(IoSession session, Object message) throws
Exception {
System.out.println(message);
receivedMessages.add((String) message);
}
});
acceptor.bind(LOCAL_ADDRESS);
receivedMessages= new ArrayList<String>();
}
@Test
public void test() throws Throwable {
List<String> sendMessages = new ArrayList<String>();
for (int i = 0; i < 3000; i++)
sendMessages.add("Message " + i);
ConnectFuture connectFuture = connector.connect(LOCAL_ADDRESS);
connectFuture.await();
final IoSession session = connectFuture.getSession();
for (String message : sendMessages)
session.write(message).await();
for (int i = 0; i < sendMessages.size(); i++)
assertEquals(sendMessages.get(i), receivedMessages.get(i));
session.close(false).await();
}
}
On Thu, Jul 21, 2011 at 8:40 AM, Chris S <[email protected]> wrote:
> Hi all,
>
> i am trying to write a IoFilter for Googles Snappy compressor /
> decompressor. I am using the Mina-LZip -Compressor Filter as a base an
> rewirited it for Snappy. The usage of Snappy is quite simpel
> (Snappy.compress(byte[]):byte[] / Snappy.uncompress(byte[]):byte[]) but
> Mina give me a headache ! I dont know what i am doing wrong, but i am
> loosing messages, even if i disable the compression (see the attached
> TestCompressor-class)
>
> This is what i do :
> Compression
>
> 1. compress the data
> 2. write the length of the compressed data as an int-value(4 byte)
> 3. write the data
>
> Decompression
>
> 1. check if i there is enough data for the length field (4byte)
> 2. read the length of the appended data
> 3. check if there is enough data to read the compressed data
> 4. read and decompress the compressed data
>
>
> Here is my Code
>
> public class CompressionFilter extends WriteRequestFilter {
> @Override
> public void messageReceived(NextFilter nextFilter, IoSession session,
> Object message) throws Exception {
> if (!(message instanceof IoBuffer)) {
> nextFilter.messageReceived(session, message);
> return;
> }
> IoBuffer inBuffer = (IoBuffer) message;
> boolean hasMoreData;
> do {
> hasMoreData = decode(inBuffer, nextFilter, session);
> }
> while (hasMoreData);
> }
> private boolean decode(IoBuffer inBuffer, NextFilter nextFilter, IoSession
> session) {
> int position = inBuffer.position();
> final int remaining = inBuffer.remaining();
> if (remaining < 4) {
> inBuffer.position(position);
> return false;
> }
> final int size = inBuffer.getInt();
> if (size > remaining - 4) {
> inBuffer.position(position);
> return false;
> }
> byte[] inBytes = new byte[size];
> inBuffer.get(inBytes);
> final byte[] uncompress = uncompress(inBytes);
> IoBuffer outBuffer = IoBuffer.wrap(uncompress);
> nextFilter.messageReceived(session, outBuffer);
> return true;
> }
> private byte[] uncompress(byte[] compressed) {
> // return Snappy.uncompress(compressed);
> return compressed;
> }
> @Override
> protected Object doFilterWrite(NextFilter nextFilter, IoSession session,
> WriteRequest writeRequest) throws IOException {
> IoBuffer inBuffer = (IoBuffer) writeRequest.getMessage();
> if (!inBuffer.hasRemaining()) {
> // Ignore empty buffers
> return null;
> } else {
> byte[] inBytes = new byte[inBuffer.remaining()];
> inBuffer.get(inBytes).flip();
> byte[] outBytes = compress(inBytes);
> IoBuffer outBuf = IoBuffer.allocate(outBytes.length + 4);
> outBuf.putInt(outBytes.length);
> outBuf.put(outBytes);
> outBuf.flip();
> return outBuf;
> }
> }
> private byte[] compress(byte[] uncompressed) {
> // return Snappy.compress(inBytes);
> return uncompressed;
> }
> }
>
>