On 05/17/2013 08:37 AM, Christian Fromme wrote:
in Qpid, broker federation is usually done via the `qpid-route`
command line utility. I wonder if there is a way to add and control
routes via the C++ API? If possible, an example would help!

All the management is done by sending specially formatted messages (the QMF protocol). This can be done with any client library (though in raw form it is a little verbose).

The recommended approach for federation is to use the 'create' method[1] to create objects of type 'link' (i.e. connections between two brokers) and associated 'bridges' (i.e. subscriptions over those connections). These can be removed using the 'delete' method.

Attached is a simple example showing what this would look like using the C++ qpid::messaging API. It creates a link between two brokers and then established a flow of messages between the amq.fanout exchanges.

Feel free to ask any questions as regrettably this is not terribly well documented.

--Gordon.

[1] The qpid-route utility predates the avialbility of the generic create method and uses Broker::connect() to create a Link object then Link::bridge() to create the bridge. I've attached an old example that does that as well, but I would recommend the create/delete approach as it is simpler.
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <sstream>

using namespace qpid::messaging;
using namespace qpid::types;

using std::string;

class Controller
{
  public:
    Controller(Session&);
    void link(const std::string& name, const std::string& host, uint16_t port,
              const std::string& transport=std::string(), const std::string& mechanisms=std::string(),
              const std::string& username=std::string(), const std::string& password=std::string());
    void bridge(const std::string& name, const std::string& link,
                const std::string& src, const std::string& dest, const std::string& tag=std::string());
  private:
    Address replyTo;
    Sender sender;
    Receiver receiver;

    void invoke(const std::string& method, const Variant::Map& args);
};

int main(int argc, char** argv)
{
    Connection c(argc > 1 ? argv[1] : "localhost");
    try {
        c.open();
        Session session = c.createSession();
        Controller controller(session);
        controller.link("my-link", "localhost", 5673);
        controller.bridge("my-bridge", "my-link", "amq.fanout", "amq.fanout");
    } catch(const std::exception& error) {
        std::cout << "ERROR: " << error.what() << std::endl;
    }
    c.close();
    return 0;
}

Controller::Controller(Session& session) :replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
                                          sender(session.createSender("qmf.default.direct/broker")),
                                          receiver(session.createReceiver(replyTo)) {}

void Controller::link(const std::string& name, const std::string& host, uint16_t port, const std::string& transport, const std::string& mechanism, const std::string& username, const std::string& password)
{
    Variant::Map args;
    args["type"] = "link";
    args["name"] = name;
    Variant::Map props;
    props["host"] = host;
    props["port"] = port;
    if (!transport.empty()) props["transport"] = transport;
    if (!mechanism.empty()) props["authMechanism"] = mechanism;
    if (!username.empty()) props["username"] = username;
    if (!password.empty())props["password"] = password;
    args["properties"] = props;
    invoke("create", args);
}

void Controller::bridge(const std::string& name, const std::string& link,
                        const std::string& src, const std::string& dest, const std::string& tag)
{
    Variant::Map args;
    args["type"] = "bridge";
    args["name"] = name;
    Variant::Map props;
    props["link"] = link;
    props["src"] = src;
    props["dest"] = dest;
    if (!tag.empty()) props["tag"] = tag;
    args["properties"] = props;
    invoke("create", args);
}

void Controller::invoke(const std::string& method, const Variant::Map& args)
{
    Variant::Map content;
    Variant::Map objectId;
    objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
    content["_object_id"] = objectId;
    content["_method_name"] = method;
    content["_arguments"] = args;

    Message request;
    request.setReplyTo(replyTo);
    request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
    request.getProperties()["qmf.opcode"] = "_method_request";
    encode(content, request);

    sender.send(request);

    Message response;
    if (receiver.fetch(response, Duration::SECOND*5)) {
        if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
            std::string opcode = response.getProperties()["qmf.opcode"];
            if (opcode == "_exception") {
                Variant::Map m;
                decode(response, m);
                std::cout << "Error: " << m["_values"] << std::endl;
            }
        } else {
            std::cout << "Invalid response received, not a qmfv2 message: app-id="
                      << response.getProperties()["x-amqp-0-10.app-id"] << std::endl;
        }
    } else {
        std::cout << "No response received" << std::endl;
    }
}
#include <iostream>
#include <string>
#include <sstream>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/exceptions.h>
#include <qpid/types/Variant.h>
#include <qpid/Msg.h>

using namespace qpid::messaging;
using namespace qpid::types;

class MethodInvoker
{
  public:
    MethodInvoker(Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
                                      sender(session.createSender("qmf.default.direct/broker")),
                                      receiver(session.createReceiver(replyTo)) {}
    void createLink(const std::string& host, uint32_t port=5672, bool durable=false,
                    const std::string& mechanism=std::string(),
                    const std::string& username=std::string(),
                    const std::string& password=std::string(),
                    const std::string& transport=std::string())
    {
        Variant::Map params;
        params["name"]=name;
        params["host"]=host;
        params["port"]=port;
        params["durable"]=durable;
        if (mechanism.size()) params["authMechanism"]=mechanism;
        if (username.size()) params["username"]=username;
        if (password.size()) params["password"]=password;
        if (transport.size()) params["transport"]=transport;
        methodRequest("connect", params);
    }

    void createBridge(const std::string& linkName,
                      const std::string& source, const std::string& destination,
                      const std::string& key=std::string(), bool durable=false)
    {
        Variant::Map params;
        params["durable"]=durable;
        params["src"]=source;
        params["dest"]=destination;
        if (key.size()) params["key"]=key;
        params["dynamic"]=false;
        params["srcIsLocal"]=false;
        params["srcIsQueue"]=false;
        methodRequest("bridge", params, linkName, "link");
    }

    void methodRequest(const std::string& method, const Variant::Map& inParams, const std::string& objectName="amqp-broker", const std::string& objectType="broker", Variant::Map* outParams = 0)
    {
        Variant::Map content;
        Variant::Map objectId;
        std::stringstream name;
        name << "org.apache.qpid.broker:" << objectType << ":" << objectName;
        objectId["_object_name"] = name.str();
        content["_object_id"] = objectId;
        content["_method_name"] = method;
        content["_arguments"] = inParams;

        Message request;
        request.setReplyTo(replyTo);
        request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
        request.getProperties()["qmf.opcode"] = "_method_request";
        encode(content, request);

        sender.send(request);

        Message response;
        if (receiver.fetch(response, Duration::SECOND*5)) {
            if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
                std::string opcode = response.getProperties()["qmf.opcode"];
                if (opcode == "_method_response") {
                    if (outParams) {
                        Variant::Map m;
                        decode(response, m);
                        *outParams = m["_arguments"].asMap();
                    }
                } else if (opcode == "_exception") {
                    Variant::Map m;
                    decode(response, m);
                    throw Exception(QPID_MSG("Error: " << m["_values"]));
                } else {
                    throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
                }
            } else {
                throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
                                         << response.getProperties()["x-amqp-0-10.app-id"]));
            }
        } else {
            throw Exception(QPID_MSG("No response received"));
        }
    }
  private:
    Address replyTo;
    Sender sender;
    Receiver receiver;
};

int main()
{

    try
    {
        Connection conn("localhost:5672");
        conn.open();
        Session session = conn.createSession();

        MethodInvoker control(session);
        control.createLink("localhost", 5673, true);
        control.createBridge("localhost,5673", "amq.direct", "amq.direct", "my-key", true);
        conn.close();
    }
    catch(std::exception& e)
    {
        std::cerr << e.what() << std::endl;
    }
    return 0;
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to