Hi Clebert, Sorry for taking so long to follow up with the benchmark. I've been tweaking it and using it to do some memory and CPU profiling.
The good news is I was able to significantly reduce memory utilization by making some simple changes to CollectorImpl. The benchmark triggers over 42 million events. The way CollectorImpl was coded initially this would create two throwaway objects for each event. This was ending up somewhere north of 84 million throw away objects over the course of the benchmark. One being the event itself, and the other being the linked list node. I changed CollectorImpl to use a simple chain instead of java.util's linked list and also to pool/reuse popped events. The same benchmark now only results in about 250 actual event objects being created in total in order to handle the same 42 million events. While this reduces memory pressure a lot, surprisingly enough, the event related objects were not the biggest source of garbage. At the top is java.nio.HeapByteBuffer (I haven't looked into this, but I suspect we're throwing away some of these that we could be reusing), and the second biggest source of garbage is org.apache.qpid.proton.amqp.UnsignedInteger. The latter one I find a bit more concerning as its use is fairly interwingled into the design of the current codec and as a result it is likely less straightforward to address. On the CPU profiling front I've noticed a couple of interesting things. First off it does appear that much of the processing time is spent in codec, roughly 63%, but the cost is not symmetrically divided between encode and decode. Encoding accounts for about 46% of the total, and decoding about 17%. I think this may be why its hard to measure the effect of your patch on this benchmark. The decoding cost just isn't all that high compared to encoding. I did the same profiling again with your patch applied and decoding dropped to about 10% of the total while encoding increased to about 50% of the total. Digging into the encoding side a bit, it appears that a significant amount of time is being spent calculating the encoded size of a value prior to writing its encoded representation to the wire. One of the optimizations I've had success with in the past (both on previous Java codecs and in the C codec) is to avoid calculating the size up front and instead simply reserve the necessary space for it and fill it in after the encoded representation has been written. In the past this has close to doubled the performance of encode since calculating the encoded size is often as expensive as simply doing the encoding. Unfortunately I'm guessing this kind of thing would probably require a major rework of the codec. To summarize I think there are really two design related issues we will need to address in order to achieve optimum performance. On the memory front, I think the fact that every described type is rendered into a tree of generic objects on both decode/encode is going to be problematic. The strategy you've taken in your patch to special case certain frames and eliminate the intermediate list objects helps with this, but I think we could do a whole lot better if we were to adopt a design that did not require any intermediate objects at all. On the CPU front, I think we'll get the biggest bang for our buck if we look into a design that doesn't require calculating the size up front. I have some ideas in mind for a new design that I hope will address both of these issues. I'm going to write them up in a separate post. Regarding your patch, I'm happy to apply it, but I suspect that much of the current codec layer would need to be modified and/or replaced to address the above findings. Let me know how you would like to proceed. --Rafael On Fri, May 2, 2014 at 10:45 AM, Clebert Suconic <[email protected]>wrote: > These shuld be all cleared now.. > > > My github branch and PR are up to date now: > > https://github.com/apache/qpid-proton/pull/1 > > > .... And isn't git is beautiful. It's already rebased with Rafi's last > commit! > > > > On May 1, 2014, at 5:32 PM, Clebert Suconic <[email protected]> wrote: > > > I will do some cleanup on this .. I already fixed the headers here on my > copy and I will do some cleanup on imports that were't supposed to be done. > > On May 1, 2014, at 5:11 PM, Robbie Gemmell <[email protected]> > wrote: > > > >> As no mail arrived here or qpid-dev, and none seems to have arrived at > what > >> used to be the default location (infra-dev) either, I had a quick look > and > >> it seems like they might have changed the process slightly and we will > need > >> to ask for the mails to be enabled at all: > >> > https://blogs.apache.org/infra/entry/improved_integration_between_apache_and > >> > >> I particularly like the mention of the new comment syncing between our > >> mailing list and the Pull Requests. > >> > >> Regarding closing the pull requests, it seems like something along the > >> lines of "This closes #<request number> at GitHub" added to the end of > the > >> svn commit message should do the trick: > >> https://help.github.com/articles/closing-issues-via-commit-messages > >> > >> I havent had a chance to really look at the actual code change but when > I > >> was quickly scrolling down the PR, in addition to the licence headers on > >> the new files that Rafi already mentioned (which I spotted due to the > >> Copyright notices we wouldnt typically have) I noticed Encoder.java > having > >> its existing licence header corrupted a little by some wayward code. > >> > >> Robbie > >> I just submitted it as a git PR: > >> > >> https://github.com/apache/qpid-proton/pull/1 > >> > >> > >> > >> On Apr 30, 2014, at 10:47 AM, Robbie Gemmell <[email protected]> > >> wrote: > >> > >>> I think anyone can sign up for ReviewBoard themselves. It certainly > didn't > >>> used to be linked to the ASF LDAP in the past, presumably for that > reason. > >>> > >>> Its probably also worth noting you can initiate pull requests against > the > >>> github mirrors. If it hasn't already been done for the proton mirror, > we > >>> can have the emails that would generate be directed to this list (e.g. > >>> > >> > http://mail-archives.apache.org/mod_mbox/qpid-dev/201401.mbox/%[email protected]%3E > >> ). > >>> We obviously can't merge the pull request via github, but you can use > >>> the reviewing tools etc and the resultant patch can be downloaded or > >>> attached to a JIRA and then applied in the usual fashion (I believe > there > >>> is a commit message syntax that can be used to trigger closing the pull > >>> request). > >>> > >>> Robbie > >>> > >>> On 30 April 2014 15:22, Rafael Schloming <[email protected]> wrote: > >>> > >>>> On Wed, Apr 30, 2014 at 8:35 AM, Clebert Suconic <[email protected] > >>>>> wrote: > >>>> > >>>>> @Rafi: I see there is a patch review process within Apache (based on > >>>> your > >>>>> other thread on Java8) > >>>>> > >>>>> Should we make this through the patch process at some point? > >>>>> > >>>> > >>>> I'm fine looking at it on your git branch, but if you'd like to play > with > >>>> the review tool then feel free. Just let me know if you need an > account > >>>> and I will try to remember how to set one up (or who to bug to get you > >>>> one). ;-) > >>>> > >>>> --Rafael > >>>> > > > >
/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package org; import org.apache.qpid.proton.engine.*; import static java.lang.Math.*; import java.nio.*; import java.util.*; /** * Main * */ public class Main { public static final void main(String[] args) { Collector c = Collector.Factory.create(); Connection c1 = Connection.Factory.create(); Connection c2 = Connection.Factory.create(); Transport t1 = Transport.Factory.create(); Transport t2 = Transport.Factory.create(); t1.bind(c1); t2.bind(c2); c1.collect(c); c2.collect(c); c1.open(); c2.open(); Session ssn1 = c1.session(); ssn1.open(); Sender snd = ssn1.sender("sender"); snd.open(); t1.setContext(t2); t2.setContext(t1); int total = 10*1024*1024; PingPong pp = new PingPong(total); pp.dispatchAll(c); if (pp.rcvd != total || pp.sent != total) { throw new RuntimeException("bad count"); } } private static final boolean unidirPump(Transport from, Transport to) { int capacity = to.capacity(); int pending = from.pending(); if (capacity > 0 && pending > 0) { ByteBuffer tail = to.tail(); ByteBuffer head = from.head(); tail.put(head); try { to.process(); } finally { int xfr = min(pending, capacity); from.pop(xfr); } return true; } else { return false; } } private static final void pump(Transport t1, Transport t2) { while (unidirPump(t1, t2) || unidirPump(t2, t1)); } static class PingPong extends EventDispatcher { private static final int SIZE = 0; private static final byte[] msg = new byte[SIZE]; private int total; private int batch = 64; private byte[] buf = new byte[SIZE]; int sent = 0; int rcvd = 0; PingPong(int total) { this.total = total; } void onLocalState(Connection connection) {} void onLocalState(Session session) {} void onLocalState(Link link) {} void onRemoteState(Connection conn) {} void onRemoteState(Session ssn) { if (ssn.getRemoteState() == EndpointState.ACTIVE) { ssn.open(); } } void onRemoteState(Link link) { if (link.getRemoteState() == EndpointState.ACTIVE) { link.open(); if (link instanceof Receiver) { Receiver rcv = (Receiver) link; rcv.flow(batch); } } } void onFlow(Link link) { if (link instanceof Sender) { Sender snd = (Sender) link; while (snd.getCredit() > 0) { Delivery dlv = snd.delivery(new byte[0]); snd.send(msg, 0, msg.length); dlv.settle(); sent++; } } } void onDelivery(Delivery dlv) { Link link = dlv.getLink(); if (link instanceof Receiver) { Receiver rcv = (Receiver) link; rcv.recv(buf, 0, buf.length); rcv.current().settle(); if (!Arrays.equals(msg, buf)) { throw new RuntimeException("garbled message"); } rcvd++; if ((rcvd % (100*1024)) == 0) { System.out.println("rcvd=" + rcvd); } if (rcvd < total && rcv.getCredit() == 0) { rcv.flow(batch); } } } void onTransport(Transport t1) { Transport t2 = (Transport) t1.getContext(); pump(t1, t2); } } // XXX: something like this should be added to the engine API static abstract class EventDispatcher { abstract void onRemoteState(Connection connection); abstract void onRemoteState(Session session); abstract void onRemoteState(Link link); abstract void onLocalState(Connection connection); abstract void onLocalState(Session session); abstract void onLocalState(Link link); abstract void onFlow(Link link); abstract void onDelivery(Delivery delivery); abstract void onTransport(Transport transport); void dispatch(Event event) { switch (event.getType()) { case CONNECTION_REMOTE_STATE: onRemoteState(event.getConnection()); break; case CONNECTION_LOCAL_STATE: onLocalState(event.getConnection()); break; case SESSION_REMOTE_STATE: onRemoteState(event.getSession()); break; case SESSION_LOCAL_STATE: onLocalState(event.getSession()); break; case LINK_REMOTE_STATE: onRemoteState(event.getLink()); break; case LINK_LOCAL_STATE: onLocalState(event.getLink()); break; case LINK_FLOW: onFlow(event.getLink()); break; case TRANSPORT: onTransport(event.getTransport()); break; case DELIVERY: onDelivery(event.getDelivery()); break; } } void dispatchAll(Collector collector) { Event ev; while ((ev = collector.peek()) != null) { dispatch(ev); collector.pop(); } } } }
