On 09/30/2011 01:59 PM, Gang, Litao wrote:
I am doing the asynchronous mechanism, i.e. when a message arrives,
a callback automatically gets called. How can you do that with the messaging
API?
I don't want to initiate a call to fetch a messages. This I don't know exactly
how to do.
At this point there is nothing built-in to do that. However it is pretty
easy to do so yourself, though again at present you need a thread per
session (as with the old qpid::client API). Just as an example of this
I've attached some simple sample code of what you could do.
I anticipate adding more in this area as time allows. There is a JIRA
open, with a very early and raw patch for review, that addresses the
thread-per-session requirement:
https://issues.apache.org/jira/browse/QPID-3460. Once that work
completes I imagine a standard, built in framework for dispatching will
be next.
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Session.h>
#include <boost/function.hpp>
using namespace qpid::messaging;
class Dispatcher
{
public:
typedef boost::function1<void, Message&> Callback;
Dispatcher(Session&);
void subscribe(const std::string& address, Callback);
void dispatch();
void stop();
private:
Session session;
std::map<std::string, Callback> callbacks;
volatile bool stopped;
int prefetch;
size_t count;
long acked;
long maxAckDelay;
size_t ackFrequency;
};
#include "Dispatcher.h"
#include <sys/time.h>
long now()
{
timeval t;
gettimeofday(&t, 0);
return t.tv_sec;
}
Dispatcher::Dispatcher(Session& s) :
session(s), stopped(true),
prefetch(50), count(0), acked(0), maxAckDelay(30), ackFrequency(prefetch/2) {}
void Dispatcher::subscribe(const std::string& address, Callback callback)
{
Receiver receiver = session.createReceiver(address);
receiver.setCapacity(prefetch);
callbacks[receiver.getName()] = callback;
}
void Dispatcher::dispatch()
{
stopped = false;
Receiver receiver;
Message message;
while (!stopped) {
if (session.nextReceiver(receiver, Duration::SECOND)
&& receiver.fetch(message, Duration::IMMEDIATE)) {
callbacks[receiver.getName()](message);
++count;
}
if (count >= ackFrequency || (count && (now() - acked) >= maxAckDelay)) {
session.acknowledge();
acked = now();
count = 0;
}
}
session.acknowledge();
}
void Dispatcher::stop()
{
stopped = true;
}
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Session.h>
#include <iostream>
#include <boost/bind.hpp>
#include "Dispatcher.h"
using namespace qpid::messaging;
class Handler
{
public:
Handler(Dispatcher&);
void f1(Message&);
void f2(Message&);
void control(Message&);
private:
Dispatcher& dispatcher;
};
int main(int argc, char** argv)
{
Connection connection("localhost:5672");
try {
connection.open();
Session session = connection.createSession();
Dispatcher dispatcher(session);
Handler handler(dispatcher);
dispatcher.subscribe("queue-1", boost::bind(&Handler::f1, &handler, _1));
dispatcher.subscribe("queue-2", boost::bind(&Handler::f2, &handler, _1));
dispatcher.subscribe("amq.topic/shutdown", boost::bind(&Handler::control, &handler, _1));
dispatcher.dispatch();
session.close();
connection.close();
return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
connection.close();
}
return 1;
}
Handler::Handler(Dispatcher& d) : dispatcher(d) {}
void Handler::f1(Message& message)
{
std::cout << "f1: " << message.getContent() << std::endl;
}
void Handler::f2(Message& message)
{
std::cout << "f2: " << message.getContent() << std::endl;
}
void Handler::control(Message& message)
{
dispatcher.stop();
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]