I've put together some mock ups of a few different codec strategies both to
compare from an API/usability perspective and to get a rough idea of some
of the performance implications of the different choices. Please see the
attached code for the full details. I'll summarize the different strategies
below.

The SimpleEncoder is pretty straighforward, the only real point here is to
use basic types to represent values and thereby minimize the amount of
intermediate memory and CPU required in order to use the codec.

The DispatchingDecoder works similarly to a sax style parser. It basically
iterates over the encoded content and dispatches values to a handler.

The StreamingDecoder is similar to the DispatchingDecoder except instead of
an internal "bytecode" loop calling out to a handler, it is externally
driven by calling into the decoder. This appears to be marginally slower
than the DispatchingDecoder in the particular scenario in the mock up,
however it may have some API benefitis, e.g. conversions can be done on
demand and it is possible to skip over uninteresting data rather than
parsing it.

The mock up also includes the same data being encoded/decode using the
existing codec (with Clebert's patch).

Fair warning, the data I chose to encode/decode is completely arbitrary and
not intended to be representative at all. That said, the numbers I'm
getting suggest to me that we can do a whole lot better than the current
codec if we start with something simple and keep it that way. Here is the
output I'm getting for a run with a hundred million iterations:

  simple encode: 4416 millis
  dispatching decode: 3049 millis
  streaming decode: 3243 millis
  existing encode: 9515 millis
  existing decode: 13931 millis

Another factor to consider is the difficulty in quantifying the impact of
generating lots of garbage. In a small benchmark like this there isn't a
lot of memory pressure, so extra garbage doesn't have a lot of impact,
however in a real application that would translate into increased GC cycles
and so might be more of a factor. What I can say from watching memory usage
under the profiler is that at any given point there are typically hundreds
of megs worth of garbage Integer and UUID instances lying around when the
existing codec is running. All of the alternative strategies I've included
don't generate any garbage.

--Rafael
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org;

import java.util.UUID;
import org.apache.qpid.proton.codec.*;
import java.nio.*;

/**
 * Codec
 *
 */

public class Codec
{

    public static final void main(String[] args) {
        int loop = 10*1024*1024;
        if (args.length > 0) {
            loop = Integer.parseInt(args[0]);
        }

        String test = "all";
        if (args.length > 1) {
            test = args[1];
        }

        boolean runDispatching =
            test.equals("all") || test.equals("dispatching");
        boolean runStreaming = test.equals("all") || test.equals("streaming");
        boolean runExisting =
            test.equals("all") || test.equals("existing");

        byte[] bytes = new byte[1024];
        ByteBuffer buf = ByteBuffer.wrap(bytes);

        long start, end;

        if (runDispatching || runStreaming) {
            start = System.currentTimeMillis();
            int size = simpleEncode(bytes, loop);
            end = System.currentTimeMillis();
            time("simple encode", start, end);

            if (runDispatching) {
                start = System.currentTimeMillis();
                dispatchingDecode(bytes, size, loop);
                end = System.currentTimeMillis();
                time("dispatching decode", start, end);
            }

            if (runStreaming) {
                start = System.currentTimeMillis();
                streamingDecode(bytes, size, loop);
                end = System.currentTimeMillis();
                time("streaming decode", start, end);
            }
        }

        if (runExisting) {
            start = System.currentTimeMillis();
            DecoderImpl dec = existingEncode(buf, loop);
            end = System.currentTimeMillis();
            time("existing encode", start, end);

            buf.flip();

            start = System.currentTimeMillis();
            existingDecode(dec, buf, loop);
            end = System.currentTimeMillis();
            time("existing decode", start, end);
        }
    }

    private static final void time(String message, long start, long end)
    {
        System.out.println(message + ": " + (end - start) + " millis");
    }

    private static final int simpleEncode(byte[] bytes, int loop)
    {
        SimpleEncoder enc = new SimpleEncoder(bytes, 0);

        UUID uuid = UUID.randomUUID();
        long hi = uuid.getMostSignificantBits();
        long lo = uuid.getLeastSignificantBits();

        for (int i = 0; i < loop; i++) {
            enc.rewind();
            for (int j = 0; j < 10; j++) {
                enc.writeInt(i + j);
            }
            enc.writeUUID(hi, lo);
        }

        return enc.size();
    }

    private static final void dispatchingDecode(byte[] bytes, int size, int loop)
    {
        Handler h = new Handler() {
            public void onInt(int value) {}
            public void onUUID(long hi, long lo) {}
            public void onString(byte[] utf8, int offset, int size) {}
            public void onBinary(byte[] bytes, int offset, int size) {}
        };
        DispatchingDecoder disp = new DispatchingDecoder(h);

        for (int i = 0; i < loop; i++) {
            disp.decode(bytes, 0, size);
        }
    }

    private static final void streamingDecode(byte[] bytes, int size, int loop)
    {
        StreamingDecoder stream = new StreamingDecoder();

        for (int i = 0; i < loop; i++) {
            stream.init(bytes, 0, size);
            int code;
            while ((code = stream.next()) != NONE) {
                switch (code) {
                case INT:
                    stream.getInt();
                    break;
                case _UUID:
                    stream.getHi();
                    stream.getLo();
                    break;
                }
            }
        }
    }

    private static final DecoderImpl existingEncode(ByteBuffer buf, int loop)
    {
        WritableBuffer wbuf = new WritableBuffer.ByteBufferWrapper(buf);
        // XXX: why do I have to create a decoder in order to encode?
        DecoderImpl dec = new DecoderImpl();
        EncoderImpl enc = new EncoderImpl(dec);

        UUID uuid = UUID.randomUUID();
        long hi = uuid.getMostSignificantBits();
        long lo = uuid.getLeastSignificantBits();

        for (int i = 0; i < loop; i++) {
            buf.position(0);
            buf.limit(1024);
            for (int j = 0; j < 10; j++) {
                enc.writeInteger(wbuf, i + j);
            }
            enc.writeUUID(wbuf, new UUID(hi, lo));
        }

        return dec;
    }

    private static final void existingDecode(DecoderImpl dec, ByteBuffer buf, int loop)
    {
        // XXX: for some reason if I create a new decoder it NPEs,
        // apparently I have to use the one that was passed to the
        // encoder?
        ReadableBuffer rbuf = new ReadableBuffer.ByteBufferReader(buf);

        int pos = buf.position();

        for (int i = 0; i < loop; i++) {
            buf.position(pos);
            for (int j = 0; j < 10; j++) {
                dec.readInteger(rbuf);
            }
            dec.readUUID(rbuf);
        }
    }

    static interface Handler {

        void onInt(int value);

        void onUUID(long hi, long lo);

        void onString(byte[] utf8, int offset, int size);

        void onBinary(byte[] bytes, int offset, int size);

    }

    public static final int NONE = -1;
    public static final int INT = 0x71;
    public static final int _UUID = 0x98;
    public static final int STRING = 0xb1;
    public static final int BINARY = 0xb0;

    private static final int readCode(byte[] bytes, int offset)
    {
        return 0xFF & bytes[offset];
    }

    private static final int readF32(byte[] bytes, int offset)
    {
        int a = 0xFF & bytes[offset + 0];
        int b = 0xFF & bytes[offset + 1];
        int c = 0xFF & bytes[offset + 2];
        int d = 0xFF & bytes[offset + 3];
        int value = a << 24 | b << 16 | c << 8 | d;
        return value;
    }

    private static final long readF64(byte[] bytes, int offset)
    {
        long a = 0xFFFFFFFFL & readF32(bytes, offset);
        long b = 0xFFFFFFFFL & readF32(bytes, offset + 4);
        long value = a << 32 | b;
        return value;
    }

    private static final int writeF32(byte[] bytes, int offset, int value)
    {
        bytes[offset + 0] = (byte) (0xFF & (value >> 24));
        bytes[offset + 1] = (byte) (0xFF & (value >> 16));
        bytes[offset + 2] = (byte) (0xFF & (value >> 8));
        bytes[offset + 3] = (byte) (0xFF & (value));
        return 4;
    }

    private static final int writeF64(byte[] bytes, int offset, long value)
    {
        bytes[offset + 0] = (byte) (0xFF & (value >> 56));
        bytes[offset + 1] = (byte) (0xFF & (value >> 48));
        bytes[offset + 2] = (byte) (0xFF & (value >> 40));
        bytes[offset + 3] = (byte) (0xFF & (value >> 32));
        bytes[offset + 4] = (byte) (0xFF & (value >> 24));
        bytes[offset + 5] = (byte) (0xFF & (value >> 16));
        bytes[offset + 6] = (byte) (0xFF & (value >>  8));
        bytes[offset + 7] = (byte) (0xFF & (value      ));
        return 8;
    }

    static class DispatchingDecoder {

        private Handler handler;

        DispatchingDecoder(Handler handler) {
            this.handler = handler;
        }

        void decode(byte[] bytes, int offset, int size) {
            int limit = offset + size;
            while (offset < limit) {
                int code = readCode(bytes, offset++);
                switch (code) {
                case INT:
                    handler.onInt(readF32(bytes, offset));
                    offset += 4;
                    break;
                case _UUID:
                    long hi = readF64(bytes, offset);
                    long lo = readF64(bytes, offset + 8);
                    handler.onUUID(hi, lo);
                    offset += 16;
                    break;
                }
            }
        }
    }

    static class SimpleEncoder {

        private byte[] bytes;
        private int start;
        private int offset;

        SimpleEncoder(byte[] bytes, int start) {
            this.bytes = bytes;
            this.start = start;
            this.offset = start;
        }

        void rewind() {
            offset = start;
        }

        void writeInt(int value) {
            bytes[offset++] = INT;
            offset += writeF32(bytes, offset, value);
        }

        void writeUUID(long hi, long lo) {
            bytes[offset++] = (byte) _UUID;
            offset += writeF64(bytes, offset, hi);
            offset += writeF64(bytes, offset, lo);
        }

        int size() {
            return offset - start;
        }

    }

    static class StreamingDecoder {

        private byte[] bytes;
        private int start;
        private int offset;
        private int limit;

        private int code;
        private long hi;
        private long lo;

        void init(byte[] bytes, int offset, int size) {
            this.bytes = bytes;
            this.start = offset;
            this.offset = offset;
            this.limit = offset + size;
        }

        int offset() {
            return offset;
        }

        int next() {
            if (offset >= limit) {
                code = NONE;
            } else {
                code = readCode(bytes, offset++);
            }

            return code;
        }

        int getInt() {
            int result;
            switch (code) {
            case INT:
                result = readF32(bytes, offset);
                offset += 4;
                break;
            default:
                throw new RuntimeException("unconvertable");
            }
            return result;
        }

        long getHi() {
            switch (code) {
            case _UUID:
                hi = readF64(bytes, offset);
                lo = readF64(bytes, offset + 8);
                offset += 16;
                break;
            default:
                throw new RuntimeException("unconvertable");
            }

            return hi;
        }

        long getLo() {
            return lo;
        }
    }

}

Reply via email to