On 06/21/2011 02:05 PM, Phil Brown wrote:
I am looking at creating a route between either an exchange (or a queue) on
one C++ Broker to an exchange on another C++ Broker. What is import for me
is that this route survives either or both Broker's being restarted. Is
this possible?
Yes, provided you have a store plugin installed you need only specify
that the route be durable.
One further consideration is whether you can tolerate message loss if
either broker restarts. If not then the messages would need to be marked
durable and you would need the route to have a durable queue as its
source (rather than an exchange). You would obviously also want queues
handling these messages downstream to be durable.
If so can the route be created via the messaging API?
Yes, though it is a little involved[1]. The routes are created by
sending map-messages to a special address (this is how qpid-route works
also, though it uses an older format for the messages).
You need to first create a connection (or 'link') between the two
brokers using the 'connect' method on the broker. You can then create a
route (or 'bridge') over that link from a given queue or exchange on the
source broker to an exchange on the destination broker. You do this
using the 'bridge' method on the link object you just created, whose
name is '<host>,<port>'. These details are defined by the management
schema available in svn and also in source distributions[2]
I've attached a simple example that creates a durable route between
amq.direct instances on two brokers for the key my-key (i.e. all
messages sent to the amq.direct exchange on broker localhost:5673 with
routing-key/subject my-key will be received my a subscriber to the
equivalent exchange on localhost:5672).
--Gordon
[1] I hope that this aspect of the schema can in future be simplified
quite a bit.
[2]
https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/management-schema.xml
#include <iostream>
#include <string>
#include <sstream>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/exceptions.h>
#include <qpid/types/Variant.h>
#include <qpid/Msg.h>
using namespace qpid::messaging;
using namespace qpid::types;
class MethodInvoker
{
public:
MethodInvoker(Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
sender(session.createSender("qmf.default.direct/broker")),
receiver(session.createReceiver(replyTo)) {}
void createLink(const std::string& host, uint32_t port=5672, bool durable=false,
const std::string& mechanism=std::string(),
const std::string& username=std::string(),
const std::string& password=std::string(),
const std::string& transport=std::string())
{
Variant::Map params;
params["host"]=host;
params["port"]=port;
params["durable"]=durable;
if (mechanism.size()) params["authMechanism"]=mechanism;
if (username.size()) params["username"]=username;
if (password.size()) params["password"]=password;
if (transport.size()) params["transport"]=transport;
methodRequest("connect", params);
}
void createBridge(const std::string& linkName,
const std::string& source, const std::string& destination,
const std::string& key=std::string(), bool durable=false)
{
Variant::Map params;
params["durable"]=durable;
params["src"]=source;
params["dest"]=destination;
if (key.size()) params["key"]=key;
params["dynamic"]=false;
params["srcIsLocal"]=false;
params["srcIsQueue"]=false;
methodRequest("bridge", params, linkName, "link");
}
void methodRequest(const std::string& method, const Variant::Map& inParams, const std::string& objectName="amqp-broker", const std::string& objectType="broker", Variant::Map* outParams = 0)
{
Variant::Map content;
Variant::Map objectId;
std::stringstream name;
name << "org.apache.qpid.broker:" << objectType << ":" << objectName;
objectId["_object_name"] = name.str();
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = inParams;
Message request;
request.setReplyTo(replyTo);
request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
request.getProperties()["qmf.opcode"] = "_method_request";
encode(content, request);
sender.send(request);
Message response;
if (receiver.fetch(response, Duration::SECOND*5)) {
if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
std::string opcode = response.getProperties()["qmf.opcode"];
if (opcode == "_method_response") {
if (outParams) {
Variant::Map m;
decode(response, m);
*outParams = m["_arguments"].asMap();
}
} else if (opcode == "_exception") {
Variant::Map m;
decode(response, m);
throw Exception(QPID_MSG("Error: " << m["_values"]));
} else {
throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
}
} else {
throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
<< response.getProperties()["x-amqp-0-10.app-id"]));
}
} else {
throw Exception(QPID_MSG("No response received"));
}
}
private:
Address replyTo;
Sender sender;
Receiver receiver;
};
int main()
{
try
{
Connection conn("localhost:5672");
conn.open();
Session session = conn.createSession();
MethodInvoker control(session);
control.createLink("localhost", 5673, true);
control.createBridge("localhost,5673", "amq.direct", "amq.direct", "my-key", true);
conn.close();
}
catch(std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]