On 02/26/2010 10:13 AM, David Stewart wrote:
The session was synchronous. Using the AsyncSession brought the 75 seconds down 
to 5-10 which is a fantastic improvement.
The bottleneck still appears to be the SessionManager though.

SessionManager::subscribe() is synchronous, which will impact the rate. One tip is to turn off message flow when you subscribe (use SubscriptionSettings(FlowControl::messageWindow(0))) and then add credit to resize the window on each subscription after you are set up (Subscription::grantMessageCredit() or Session::messageFlow()) to enable messages to flow (this can be asynchronous). I get through 20000 in ~25 secs on my laptop with that (see attached).

However can I ask why you need a separate queue per exchange? Could you instead have one queue bound into each of the exchanges? That would be far more efficient (both for setup and at runtime) and would still give you all the messages sent to any of those exchanges.


I should mention that we're running a vc90 C++ client against a vc90 C++ 
broker. Could the broker be the problem?
Should I see better performance from a linux broker?

-----Original Message-----
From: Gordon Sim [mailto:[email protected]]
Sent: 25 February 2010 17:39
To: [email protected]
Cc: David Stewart; [email protected]
Subject: Re: SubscriptionManager performance problem.

On 02/25/2010 04:51 PM, David Stewart wrote:
Hi all,
we are running a bridge between our old middleware and qpid system which at 
startup queries the existing middleware for the number of broadcast groups it 
knows about. It is a pricing system so there are ~20000.

The bridge creates a fanout exchange for each broadcast group, creates a queue 
and binds it to the exchange. All this takes ~75 seconds for 20000. Not a 
problem.

When we add a SubscriptionManager.subscribe() to the loop we get though 1000 
requests in ~3 minutes.
I have created an example below which exhibits the problem. The question are we 
using the SubscriptionManager incorrectly?

Is oSession an AsyncSession or a SyncSession (a plain Session is a 
SyncSession)? Each call to SubscriptionManager::subscribe() will involve a 
sync() which will certainly reduce the rate you can get through them, but 1000 
in ~3 minutes seems slow even then.

Is there a better way for us to achieve the result we require?

for (int i=0; i<   20000; ++i) {
          std::stringstream ss; ss<<   "listener"<<   i;


          // Try and declare the exchange. Will succeed even if it already 
exists.
          oSession.exchangeDeclare(qpid::client::arg::exchange=ss.str(),
                                          qpid::client::arg::type="fanout",
                                          
qpid::client::arg::alternateExchange=std::string(),
                                          qpid::client::arg::passive=false,

qpid::client::arg::durable=true);

          oSession.queueDeclare(qpid::client::arg::queue=ss.str(),
                                  qpid::client::arg::exclusive=true,
                                  qpid::client::arg::autoDelete=false);

          oSession.exchangeBind(qpid::client::arg::exchange=ss.str(),
                                          qpid::client::arg::queue=ss.str(),

qpid::client::arg::bindingKey=ss.str());

          oSubscriptionManager.subscribe(*this, ss.str()); }

Regards,
Dave

The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44 (0)20 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusions (etc.) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG Index Ltd is a company registered in England and Wales under number 
01190902. VAT registration number 761 2978 07. Registered Office: Friars House, 
157-168 Blackfriars Road, London SE1 8EZ. Authorised and regulated by the 
Financial Services Authority. FSA Register number 114059.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]



The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44 (0)20 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusions (etc.) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG Index Ltd is a company registered in England and Wales under number 
01190902. VAT registration number 761 2978 07. Registered Office: Friars House, 
157-168 Blackfriars Road, London SE1 8EZ. Authorised and regulated by the 
Financial Services Authority. FSA Register number 114059.

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/client/Connection.h>
#include <qpid/client/AsyncSession.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/framing/enum.h>

#include <cstdlib>
#include <iostream>
#include <sstream>

using namespace qpid::client;
using namespace qpid::sys;

struct MyListener : MessageListener
{
    void received(Message&) {}
};

int main(int argc, char** argv) {
    ConnectionSettings settings;
    if (argc>1) settings.host = argv[1];
    if (argc>2) settings.port = atoi(argv[2]);
    Connection connection;
    try {
        connection.open(settings);
        AsyncSession session =  connection.newSession();

        SubscriptionManager subscriptions(session);
        MyListener listener;
        SubscriptionSettings settings(FlowControl::messageWindow(0));
        AbsTime start = now();
        const uint count = 20000;
        for (uint i = 0 ; i < count; i++) {
            std::stringstream exchange;
            exchange << "e" << i;
            std::stringstream queue;
            queue << "q" << i;
            session.exchangeDeclare(arg::exchange=exchange.str(), arg::type="fanout");
            session.queueDeclare(arg::queue=queue.str());
            session.exchangeBind(arg::exchange=exchange.str(), arg::queue=queue.str());
            subscriptions.subscribe(listener, queue.str(), settings);
            if ((i+1) % 1000 == 0) std::cout << "completed " << (i+1) << " of " << count << std::endl;
        }
        for (uint i = 0 ; i < count; i++) {
            std::stringstream queue;
            queue << "q" << i;
            session.messageFlow(queue.str(), qpid::framing::message::CREDIT_UNIT_MESSAGE, 1000);
        }
        AbsTime end = now();
        Duration d(start, end);
        double time = d / TIME_SEC;
        std::cout << std::endl << "Completed " << count << " subscriptions in " <<  time << "seconds" << std::endl;
        subscriptions.run();
        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
    }
    return 1;
}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to