On 09/30/2011 01:59 PM, Gang, Litao wrote:
I am doing the asynchronous mechanism, i.e. when a message arrives,
a callback automatically gets called. How can you do that with the messaging 
API?
I don't want to initiate a call to fetch a messages.  This I don't know exactly 
how to do.

At this point there is nothing built-in to do that. However it is pretty easy to do so yourself, though again at present you need a thread per session (as with the old qpid::client API). Just as an example of this I've attached some simple sample code of what you could do.

I anticipate adding more in this area as time allows. There is a JIRA open, with a very early and raw patch for review, that addresses the thread-per-session requirement: https://issues.apache.org/jira/browse/QPID-3460. Once that work completes I imagine a standard, built in framework for dispatching will be next.

#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>

#include <boost/function.hpp>

using namespace qpid::messaging;

class Dispatcher
{
  public:
    typedef boost::function1<void, Message&> Callback;
    Dispatcher(Session&);
    void subscribe(const std::string& address, Callback);
    void dispatch();
    void stop();
  private:
    Session session;
    std::map<std::string, Callback> callbacks;
    volatile bool stopped;
    int prefetch;
    size_t count;
    long acked;
    long maxAckDelay;
    size_t ackFrequency;
};
#include "Dispatcher.h"
#include <sys/time.h>

long now()
{
    timeval t;
    gettimeofday(&t, 0);
    return t.tv_sec;
}

Dispatcher::Dispatcher(Session& s) : 
    session(s), stopped(true), 
    prefetch(50), count(0), acked(0), maxAckDelay(30), ackFrequency(prefetch/2) {}

void Dispatcher::subscribe(const std::string& address, Callback callback)
{
    Receiver receiver = session.createReceiver(address);
    receiver.setCapacity(prefetch);
    callbacks[receiver.getName()] = callback;
}

void Dispatcher::dispatch()
{
    stopped = false;
    Receiver receiver;
    Message message;
    while (!stopped) {                
        if (session.nextReceiver(receiver, Duration::SECOND)
            && receiver.fetch(message, Duration::IMMEDIATE)) {
            callbacks[receiver.getName()](message);
            ++count;
        }
        if (count >= ackFrequency || (count && (now() - acked) >= maxAckDelay)) {
            session.acknowledge();
            acked = now();
            count = 0;
        }
    }
    session.acknowledge();
}

void Dispatcher::stop()
{
    stopped = true;
}
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Session.h>

#include <iostream>
#include <boost/bind.hpp>
#include "Dispatcher.h"

using namespace qpid::messaging;

class Handler
{
  public:
    Handler(Dispatcher&);
    void f1(Message&);
    void f2(Message&);
    void control(Message&);
  private:
    Dispatcher& dispatcher;
};

int main(int argc, char** argv)
{
    Connection connection("localhost:5672");
    try {
        connection.open();
        Session session = connection.createSession();
        Dispatcher dispatcher(session);
        Handler handler(dispatcher); 
        dispatcher.subscribe("queue-1", boost::bind(&Handler::f1, &handler, _1));
        dispatcher.subscribe("queue-2", boost::bind(&Handler::f2, &handler, _1));
        dispatcher.subscribe("amq.topic/shutdown", boost::bind(&Handler::control, &handler, _1));
        dispatcher.dispatch();
        session.close();            
        connection.close();
        return 0;
    } catch(const std::exception& error) {
        std::cout << error.what() << std::endl;
        connection.close();
    }
    return 1;   
}

Handler::Handler(Dispatcher& d) : dispatcher(d) {}

void Handler::f1(Message& message)
{
    std::cout << "f1: " << message.getContent() << std::endl;
}

void Handler::f2(Message& message)
{
    std::cout << "f2: " << message.getContent() << std::endl;
}

void Handler::control(Message& message)
{
    dispatcher.stop();
}

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to