On 05/17/2013 08:37 AM, Christian Fromme wrote:
in Qpid, broker federation is usually done via the `qpid-route`
command line utility. I wonder if there is a way to add and control
routes via the C++ API? If possible, an example would help!
All the management is done by sending specially formatted messages (the
QMF protocol). This can be done with any client library (though in raw
form it is a little verbose).
The recommended approach for federation is to use the 'create' method[1]
to create objects of type 'link' (i.e. connections between two brokers)
and associated 'bridges' (i.e. subscriptions over those connections).
These can be removed using the 'delete' method.
Attached is a simple example showing what this would look like using the
C++ qpid::messaging API. It creates a link between two brokers and then
established a flow of messages between the amq.fanout exchanges.
Feel free to ask any questions as regrettably this is not terribly well
documented.
--Gordon.
[1] The qpid-route utility predates the avialbility of the generic
create method and uses Broker::connect() to create a Link object then
Link::bridge() to create the bridge. I've attached an old example that
does that as well, but I would recommend the create/delete approach as
it is simpler.
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <sstream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
class Controller
{
public:
Controller(Session&);
void link(const std::string& name, const std::string& host, uint16_t port,
const std::string& transport=std::string(), const std::string& mechanisms=std::string(),
const std::string& username=std::string(), const std::string& password=std::string());
void bridge(const std::string& name, const std::string& link,
const std::string& src, const std::string& dest, const std::string& tag=std::string());
private:
Address replyTo;
Sender sender;
Receiver receiver;
void invoke(const std::string& method, const Variant::Map& args);
};
int main(int argc, char** argv)
{
Connection c(argc > 1 ? argv[1] : "localhost");
try {
c.open();
Session session = c.createSession();
Controller controller(session);
controller.link("my-link", "localhost", 5673);
controller.bridge("my-bridge", "my-link", "amq.fanout", "amq.fanout");
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
Controller::Controller(Session& session) :replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
sender(session.createSender("qmf.default.direct/broker")),
receiver(session.createReceiver(replyTo)) {}
void Controller::link(const std::string& name, const std::string& host, uint16_t port, const std::string& transport, const std::string& mechanism, const std::string& username, const std::string& password)
{
Variant::Map args;
args["type"] = "link";
args["name"] = name;
Variant::Map props;
props["host"] = host;
props["port"] = port;
if (!transport.empty()) props["transport"] = transport;
if (!mechanism.empty()) props["authMechanism"] = mechanism;
if (!username.empty()) props["username"] = username;
if (!password.empty())props["password"] = password;
args["properties"] = props;
invoke("create", args);
}
void Controller::bridge(const std::string& name, const std::string& link,
const std::string& src, const std::string& dest, const std::string& tag)
{
Variant::Map args;
args["type"] = "bridge";
args["name"] = name;
Variant::Map props;
props["link"] = link;
props["src"] = src;
props["dest"] = dest;
if (!tag.empty()) props["tag"] = tag;
args["properties"] = props;
invoke("create", args);
}
void Controller::invoke(const std::string& method, const Variant::Map& args)
{
Variant::Map content;
Variant::Map objectId;
objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = args;
Message request;
request.setReplyTo(replyTo);
request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
request.getProperties()["qmf.opcode"] = "_method_request";
encode(content, request);
sender.send(request);
Message response;
if (receiver.fetch(response, Duration::SECOND*5)) {
if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
std::string opcode = response.getProperties()["qmf.opcode"];
if (opcode == "_exception") {
Variant::Map m;
decode(response, m);
std::cout << "Error: " << m["_values"] << std::endl;
}
} else {
std::cout << "Invalid response received, not a qmfv2 message: app-id="
<< response.getProperties()["x-amqp-0-10.app-id"] << std::endl;
}
} else {
std::cout << "No response received" << std::endl;
}
}
#include <iostream>
#include <string>
#include <sstream>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/exceptions.h>
#include <qpid/types/Variant.h>
#include <qpid/Msg.h>
using namespace qpid::messaging;
using namespace qpid::types;
class MethodInvoker
{
public:
MethodInvoker(Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
sender(session.createSender("qmf.default.direct/broker")),
receiver(session.createReceiver(replyTo)) {}
void createLink(const std::string& host, uint32_t port=5672, bool durable=false,
const std::string& mechanism=std::string(),
const std::string& username=std::string(),
const std::string& password=std::string(),
const std::string& transport=std::string())
{
Variant::Map params;
params["name"]=name;
params["host"]=host;
params["port"]=port;
params["durable"]=durable;
if (mechanism.size()) params["authMechanism"]=mechanism;
if (username.size()) params["username"]=username;
if (password.size()) params["password"]=password;
if (transport.size()) params["transport"]=transport;
methodRequest("connect", params);
}
void createBridge(const std::string& linkName,
const std::string& source, const std::string& destination,
const std::string& key=std::string(), bool durable=false)
{
Variant::Map params;
params["durable"]=durable;
params["src"]=source;
params["dest"]=destination;
if (key.size()) params["key"]=key;
params["dynamic"]=false;
params["srcIsLocal"]=false;
params["srcIsQueue"]=false;
methodRequest("bridge", params, linkName, "link");
}
void methodRequest(const std::string& method, const Variant::Map& inParams, const std::string& objectName="amqp-broker", const std::string& objectType="broker", Variant::Map* outParams = 0)
{
Variant::Map content;
Variant::Map objectId;
std::stringstream name;
name << "org.apache.qpid.broker:" << objectType << ":" << objectName;
objectId["_object_name"] = name.str();
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = inParams;
Message request;
request.setReplyTo(replyTo);
request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
request.getProperties()["qmf.opcode"] = "_method_request";
encode(content, request);
sender.send(request);
Message response;
if (receiver.fetch(response, Duration::SECOND*5)) {
if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
std::string opcode = response.getProperties()["qmf.opcode"];
if (opcode == "_method_response") {
if (outParams) {
Variant::Map m;
decode(response, m);
*outParams = m["_arguments"].asMap();
}
} else if (opcode == "_exception") {
Variant::Map m;
decode(response, m);
throw Exception(QPID_MSG("Error: " << m["_values"]));
} else {
throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
}
} else {
throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
<< response.getProperties()["x-amqp-0-10.app-id"]));
}
} else {
throw Exception(QPID_MSG("No response received"));
}
}
private:
Address replyTo;
Sender sender;
Receiver receiver;
};
int main()
{
try
{
Connection conn("localhost:5672");
conn.open();
Session session = conn.createSession();
MethodInvoker control(session);
control.createLink("localhost", 5673, true);
control.createBridge("localhost,5673", "amq.direct", "amq.direct", "my-key", true);
conn.close();
}
catch(std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]