Hi Clebert,

Sorry for taking so long to follow up with the benchmark. I've been
tweaking it and using it to do some memory and CPU profiling.

The good news is I was able to significantly reduce memory utilization by
making some simple changes to CollectorImpl. The benchmark triggers over 42
million events. The way CollectorImpl was coded initially this would create
two throwaway objects for each event. This was ending up somewhere north of
84 million throw away objects over the course of the benchmark. One being
the event itself, and the other being the linked list node. I changed
CollectorImpl to use a simple chain instead of java.util's linked list and
also to pool/reuse popped events. The same benchmark now only results in
about 250 actual event objects being created in total in order to handle
the same 42 million events.

While this reduces memory pressure a lot, surprisingly enough, the event
related objects were not the biggest source of garbage. At the top is
java.nio.HeapByteBuffer (I haven't looked into this, but I suspect we're
throwing away some of these that we could be reusing), and the second
biggest source of garbage is org.apache.qpid.proton.amqp.UnsignedInteger.
The latter one I find a bit more concerning as its use is fairly
interwingled into the design of the current codec and as a result it is
likely less straightforward to address.

On the CPU profiling front I've noticed a couple of interesting things.
First off it does appear that much of the processing time is spent in
codec, roughly 63%, but the cost is not symmetrically divided between
encode and decode. Encoding accounts for about 46% of the total, and
decoding about 17%. I think this may be why its hard to measure the effect
of your patch on this benchmark. The decoding cost just isn't all that high
compared to encoding. I did the same profiling again with your patch
applied and decoding dropped to about 10% of the total while encoding
increased to about 50% of the total.

Digging into the encoding side a bit, it appears that a significant amount
of time is being spent calculating the encoded size of a value prior to
writing its encoded representation to the wire. One of the optimizations
I've had success with in the past (both on previous Java codecs and in the
C codec) is to avoid calculating the size up front and instead simply
reserve the necessary space for it and fill it in after the encoded
representation has been written. In the past this has close to doubled the
performance of encode since calculating the encoded size is often as
expensive as simply doing the encoding. Unfortunately I'm guessing this
kind of thing would probably require a major rework of the codec.

To summarize I think there are really two design related issues we will
need to address in order to achieve optimum performance. On the memory
front, I think the fact that every described type is rendered into a tree
of generic objects on both decode/encode is going to be problematic. The
strategy you've taken in your patch to special case certain frames and
eliminate the intermediate list objects helps with this, but I think we
could do a whole lot better if we were to adopt a design that did not
require any intermediate objects at all. On the CPU front, I think we'll
get the biggest bang for our buck if we look into a design that doesn't
require calculating the size up front.

I have some ideas in mind for a new design that I hope will address both of
these issues. I'm going to write them up in a separate post.

Regarding your patch, I'm happy to apply it, but I suspect that much of the
current codec layer would need to be modified and/or replaced to address
the above findings. Let me know how you would like to proceed.

--Rafael

On Fri, May 2, 2014 at 10:45 AM, Clebert Suconic <csuco...@redhat.com>wrote:

> These shuld be all cleared now..
>
>
> My github branch and PR are up to date now:
>
> https://github.com/apache/qpid-proton/pull/1
>
>
> .... And isn't git is beautiful. It's already rebased with Rafi's last
> commit!
>
>
>
> On May 1, 2014, at 5:32 PM, Clebert Suconic <csuco...@redhat.com> wrote:
>
> > I will do some cleanup on this .. I already fixed the headers here on my
> copy and I will do some cleanup on imports that were't supposed to be done.
> > On May 1, 2014, at 5:11 PM, Robbie Gemmell <robbie.gemm...@gmail.com>
> wrote:
> >
> >> As no mail arrived here or qpid-dev, and none seems to have arrived at
> what
> >> used to be the default location (infra-dev) either, I had a quick look
> and
> >> it seems like they might have changed the process slightly and we will
> need
> >> to ask for the mails to be enabled at all:
> >>
> https://blogs.apache.org/infra/entry/improved_integration_between_apache_and
> >>
> >> I particularly like the mention of the new comment syncing between our
> >> mailing list and the Pull Requests.
> >>
> >> Regarding closing the pull requests, it seems like something along the
> >> lines of "This closes #<request number> at GitHub" added to the end of
> the
> >> svn commit message should do the trick:
> >> https://help.github.com/articles/closing-issues-via-commit-messages
> >>
> >> I havent had a chance to really look at the actual code change but when
> I
> >> was quickly scrolling down the PR, in addition to the licence headers on
> >> the new files that Rafi already mentioned (which I spotted due to the
> >> Copyright notices we wouldnt typically have) I noticed Encoder.java
> having
> >> its existing licence header corrupted a little by some wayward code.
> >>
> >> Robbie
> >> I just submitted it as a git PR:
> >>
> >> https://github.com/apache/qpid-proton/pull/1
> >>
> >>
> >>
> >> On Apr 30, 2014, at 10:47 AM, Robbie Gemmell <robbie.gemm...@gmail.com>
> >> wrote:
> >>
> >>> I think anyone can sign up for ReviewBoard themselves. It certainly
> didn't
> >>> used to be linked to the ASF LDAP in the past, presumably for that
> reason.
> >>>
> >>> Its probably also worth noting you can initiate pull requests against
> the
> >>> github mirrors. If it hasn't already been done for the proton mirror,
> we
> >>> can have the emails that would generate be directed to this list (e.g.
> >>>
> >>
> http://mail-archives.apache.org/mod_mbox/qpid-dev/201401.mbox/%3c20140130180355.3cf9e916...@tyr.zones.apache.org%3E
> >> ).
> >>> We obviously can't merge the pull request via github, but you can use
> >>> the reviewing tools etc and the resultant patch can be downloaded or
> >>> attached to a JIRA and then applied in the usual fashion (I believe
> there
> >>> is a commit message syntax that can be used to trigger closing the pull
> >>> request).
> >>>
> >>> Robbie
> >>>
> >>> On 30 April 2014 15:22, Rafael Schloming <r...@alum.mit.edu> wrote:
> >>>
> >>>> On Wed, Apr 30, 2014 at 8:35 AM, Clebert Suconic <csuco...@redhat.com
> >>>>> wrote:
> >>>>
> >>>>> @Rafi: I see there is a patch review  process within Apache (based on
> >>>> your
> >>>>> other thread on Java8)
> >>>>>
> >>>>> Should we make this through the patch process at some point?
> >>>>>
> >>>>
> >>>> I'm fine looking at it on your git branch, but if you'd like to play
> with
> >>>> the review tool then feel free.  Just let me know if you need an
> account
> >>>> and I will try to remember how to set one up (or who to bug to get you
> >>>> one). ;-)
> >>>>
> >>>> --Rafael
> >>>>
> >
>
>
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

package org;

import org.apache.qpid.proton.engine.*;
import static java.lang.Math.*;
import java.nio.*;
import java.util.*;


/**
 * Main
 *
 */

public class Main
{

    public static final void main(String[] args)
    {
        Collector c = Collector.Factory.create();

        Connection c1 = Connection.Factory.create();
        Connection c2 = Connection.Factory.create();
        Transport t1 = Transport.Factory.create();
        Transport t2 = Transport.Factory.create();
        t1.bind(c1);
        t2.bind(c2);

        c1.collect(c);
        c2.collect(c);
        c1.open();
        c2.open();

        Session ssn1 = c1.session();
        ssn1.open();
        Sender snd = ssn1.sender("sender");
        snd.open();


        t1.setContext(t2);
        t2.setContext(t1);

        int total = 10*1024*1024;
        PingPong pp = new PingPong(total);
        pp.dispatchAll(c);

        if (pp.rcvd != total || pp.sent != total) {
            throw new RuntimeException("bad count");
        }
    }

    private static final boolean unidirPump(Transport from, Transport to)
    {
        int capacity = to.capacity();
        int pending = from.pending();

        if (capacity > 0 && pending > 0) {
            ByteBuffer tail = to.tail();
            ByteBuffer head = from.head();
            tail.put(head);
            try {
                to.process();
            } finally {
                int xfr = min(pending, capacity);
                from.pop(xfr);
            }

            return true;
        } else {
            return false;
        }
    }

    private static final void pump(Transport t1, Transport t2)
    {
        while (unidirPump(t1, t2) || unidirPump(t2, t1));
    }

    static class PingPong extends EventDispatcher {

        private static final int SIZE = 0;
        private static final byte[] msg = new byte[SIZE];

        private int total;
        private int batch = 64;
        private byte[] buf = new byte[SIZE];
        int sent = 0;
        int rcvd = 0;

        PingPong(int total) {
            this.total = total;
        }

        void onLocalState(Connection connection) {}
        void onLocalState(Session session) {}
        void onLocalState(Link link) {}

        void onRemoteState(Connection conn) {}

        void onRemoteState(Session ssn) {
            if (ssn.getRemoteState() == EndpointState.ACTIVE) {
                ssn.open();
            }
        }

        void onRemoteState(Link link) {
            if (link.getRemoteState() == EndpointState.ACTIVE) {
                link.open();

                if (link instanceof Receiver) {
                    Receiver rcv = (Receiver) link;
                    rcv.flow(batch);
                }
            }
        }

        void onFlow(Link link) {
            if (link instanceof Sender) {
                Sender snd = (Sender) link;
                while (snd.getCredit() > 0) {
                    Delivery dlv = snd.delivery(new byte[0]);
                    snd.send(msg, 0, msg.length);
                    dlv.settle();
                    sent++;
                }
            }
        }

        void onDelivery(Delivery dlv) {
            Link link = dlv.getLink();
            if (link instanceof Receiver) {
                Receiver rcv = (Receiver) link;
                rcv.recv(buf, 0, buf.length);
                rcv.current().settle();
                if (!Arrays.equals(msg, buf)) {
                    throw new RuntimeException("garbled message");
                }
                rcvd++;
                if ((rcvd % (100*1024)) == 0) {
                    System.out.println("rcvd=" + rcvd);
                }
                if (rcvd < total && rcv.getCredit() == 0) {
                    rcv.flow(batch);
                }
            }
        }

        void onTransport(Transport t1) {
            Transport t2 = (Transport) t1.getContext();
            pump(t1, t2);
        }

    }

    // XXX: something like this should be added to the engine API
    static abstract class EventDispatcher {

        abstract void onRemoteState(Connection connection);
        abstract void onRemoteState(Session session);
        abstract void onRemoteState(Link link);

        abstract void onLocalState(Connection connection);
        abstract void onLocalState(Session session);
        abstract void onLocalState(Link link);

        abstract void onFlow(Link link);
        abstract void onDelivery(Delivery delivery);
        abstract void onTransport(Transport transport);

        void dispatch(Event event) {
            switch (event.getType()) {
            case CONNECTION_REMOTE_STATE:
                onRemoteState(event.getConnection());
                break;
            case CONNECTION_LOCAL_STATE:
                onLocalState(event.getConnection());
                break;
            case SESSION_REMOTE_STATE:
                onRemoteState(event.getSession());
                break;
            case SESSION_LOCAL_STATE:
                onLocalState(event.getSession());
                break;
            case LINK_REMOTE_STATE:
                onRemoteState(event.getLink());
                break;
            case LINK_LOCAL_STATE:
                onLocalState(event.getLink());
                break;
            case LINK_FLOW:
                onFlow(event.getLink());
                break;
            case TRANSPORT:
                onTransport(event.getTransport());
                break;
            case DELIVERY:
                onDelivery(event.getDelivery());
                break;
            }
        }

        void dispatchAll(Collector collector) {
            Event ev;
            while ((ev = collector.peek()) != null) {
                dispatch(ev);
                collector.pop();
            }
        }

    }

}

Reply via email to