On 06/21/2011 02:05 PM, Phil Brown wrote:
I am looking at creating a route between either an exchange (or a queue) on
one C++ Broker to an exchange on another C++ Broker.  What is import for me
is that this route survives either or both Broker's being restarted.  Is
this possible?

Yes, provided you have a store plugin installed you need only specify that the route be durable.

One further consideration is whether you can tolerate message loss if either broker restarts. If not then the messages would need to be marked durable and you would need the route to have a durable queue as its source (rather than an exchange). You would obviously also want queues handling these messages downstream to be durable.

If so can the route be created via the messaging API?

Yes, though it is a little involved[1]. The routes are created by sending map-messages to a special address (this is how qpid-route works also, though it uses an older format for the messages).

You need to first create a connection (or 'link') between the two brokers using the 'connect' method on the broker. You can then create a route (or 'bridge') over that link from a given queue or exchange on the source broker to an exchange on the destination broker. You do this using the 'bridge' method on the link object you just created, whose name is '<host>,<port>'. These details are defined by the management schema available in svn and also in source distributions[2]

I've attached a simple example that creates a durable route between amq.direct instances on two brokers for the key my-key (i.e. all messages sent to the amq.direct exchange on broker localhost:5673 with routing-key/subject my-key will be received my a subscriber to the equivalent exchange on localhost:5672).

--Gordon

[1] I hope that this aspect of the schema can in future be simplified quite a bit.

[2] https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/management-schema.xml
#include <iostream>
#include <string>
#include <sstream>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/exceptions.h>
#include <qpid/types/Variant.h>
#include <qpid/Msg.h>

using namespace qpid::messaging;
using namespace qpid::types;

class MethodInvoker
{
  public:
    MethodInvoker(Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
                                      sender(session.createSender("qmf.default.direct/broker")),
                                      receiver(session.createReceiver(replyTo)) {}
    void createLink(const std::string& host, uint32_t port=5672, bool durable=false,
                    const std::string& mechanism=std::string(),
                    const std::string& username=std::string(),
                    const std::string& password=std::string(),
                    const std::string& transport=std::string())
    {
        Variant::Map params;
        params["host"]=host;
        params["port"]=port;
        params["durable"]=durable;
        if (mechanism.size()) params["authMechanism"]=mechanism;
        if (username.size()) params["username"]=username;
        if (password.size()) params["password"]=password;
        if (transport.size()) params["transport"]=transport;
        methodRequest("connect", params);
    }

    void createBridge(const std::string& linkName,
                      const std::string& source, const std::string& destination,
                      const std::string& key=std::string(), bool durable=false)
    {
        Variant::Map params;
        params["durable"]=durable;
        params["src"]=source;
        params["dest"]=destination;
        if (key.size()) params["key"]=key;
        params["dynamic"]=false;
        params["srcIsLocal"]=false;
        params["srcIsQueue"]=false;
        methodRequest("bridge", params, linkName, "link");
    }

    void methodRequest(const std::string& method, const Variant::Map& inParams, const std::string& objectName="amqp-broker", const std::string& objectType="broker", Variant::Map* outParams = 0)
    {
        Variant::Map content;
        Variant::Map objectId;
        std::stringstream name;
        name << "org.apache.qpid.broker:" << objectType << ":" << objectName;
        objectId["_object_name"] = name.str();
        content["_object_id"] = objectId;
        content["_method_name"] = method;
        content["_arguments"] = inParams;

        Message request;
        request.setReplyTo(replyTo);
        request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
        request.getProperties()["qmf.opcode"] = "_method_request";
        encode(content, request);

        sender.send(request);

        Message response;
        if (receiver.fetch(response, Duration::SECOND*5)) {
            if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
                std::string opcode = response.getProperties()["qmf.opcode"];
                if (opcode == "_method_response") {
                    if (outParams) {
                        Variant::Map m;
                        decode(response, m);
                        *outParams = m["_arguments"].asMap();
                    }
                } else if (opcode == "_exception") {
                    Variant::Map m;
                    decode(response, m);
                    throw Exception(QPID_MSG("Error: " << m["_values"]));
                } else {
                    throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
                }
            } else {
                throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
                                         << response.getProperties()["x-amqp-0-10.app-id"]));
            }
        } else {
            throw Exception(QPID_MSG("No response received"));
        }
    }
  private:
    Address replyTo;
    Sender sender;
    Receiver receiver;
};

int main()
{

    try
    {
        Connection conn("localhost:5672");
        conn.open();
        Session session = conn.createSession();

        MethodInvoker control(session);
        control.createLink("localhost", 5673, true);
        control.createBridge("localhost,5673", "amq.direct", "amq.direct", "my-key", true);
        conn.close();
    }
    catch(std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to