Hi Sindura,
The way to do it programatically is to use QMF (the Qpid Management
Framework). It's not too complicated, but unfortunately it's not trivial
either. Apologies for the long response, but I hope it's enough to get
you started.
There's definitely enough information here, so the exam question is for
you to to put it into practice. If you put together a sample please do
post it back to the group.
The protocol is basically a series of Map Messages.
The protocol is documented here:
https://cwiki.apache.org/qpid/qmf-map-message-protocol.html
and the API documented here
https://cwiki.apache.org/qpid/qmfv2-api-proposal.html
It's a shame that you're using C++ as I've actually recently written a
Java implementation of the QMF2 API for Java, see
https://issues.apache.org/jira/browse/QPID-3675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
There is a C++ API of sorts, however to date it doesn't appear to
conform in any way to the QMF2 API that I linked above :-( if I get time
I might look into writing one, but I need to get a life after doing the
Java one. Plus nobody seems to be biting on the Java one so I've no idea
whether it's actually worth doing.
For what it's worth the place to look to figure out how to use the
current C++ API is (IIRC) in here "qpid/cpp/src/qmf". I think you'd use
the ConsoleSession to retrieve the broker ManagementObject and invoke
the create method on that. I'm afraid though that I'm not familiar with
this API and have never used it.
Gordon Sim, who's something of a Qpid guru put together a few examples
of using the QMF2 MapMessage protocol directly in C++ you might actually
get more mileage out of these than the existing C++ API. I've attached
them here though I can't take any credit for them.
test1.cpp illustrates how to get the message depth from a specified queue
test2.cpp illustrates how to delete a queue (create uses the "create"
method and you specify the arguments etc. via Variant::Map properties)
example.cpp illustrates how to create links and bridges (essentially how
qpid-route works)
I've also attached some help that Gordon put together.
With respect to creating queues/exchanges via QMF2. My Java QMF2
implementation actually includes a Java port of qpid-config that happens
to implement QpidConfig add queue/exchange del queue/exchange via QMF2.
Although I use the API you still have to pass the parameters to the
create method as Map Messages. I've copied the Java code for creating
queues and exchanges below so you should be able to use this in
conjunction with Gordon's test2.cpp to get you going (assuming that
you've got some reasonable C++ behind you). BTW QmfData below is just a
wrapper for a Map that provides a number of accessor/mutator methods,
the bit that you care about is the properties Map. In C++ I think you'd
create a Variant::Map and populate that in a similar way to what I've
done below and pass that Variant::Map as a property called "properties"
in code based on test2.cpp attached
HTH
Frase
/**
* Add an exchange using the QMF "create" method.
* @param args the exchange type is the first argument and the
exchange name is the second argument.
* The remaining QMF method properties are populated form config
parsed from the command line.
*/
private void addExchange(final String[] args)
{
if (args.length < 2)
{
usage();
}
Map<String, Object> properties = new HashMap<String, Object>();
if (_durable)
{
properties.put("durable", true);
}
properties.put("exchange-type", args[0]);
if (_msgSequence)
{
properties.put(MSG_SEQUENCE, 1l);
}
if (_ive)
{
properties.put(IVE, 1l);
}
if (_altExchange != null)
{
properties.put("alternate-exchange", _altExchange);
}
QmfData arguments = new QmfData();
arguments.setValue("type", "exchange");
arguments.setValue("name", args[1]);
arguments.setValue("properties", properties);
try
{
_broker.invokeMethod("create", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
// passive exchange creation not implemented yet (not sure how
to do it using QMF2)
}
/**
* Add a queue using the QMF "create" method.
* @param args the queue name is the first argument.
* The remaining QMF method properties are populated form config
parsed from the command line.
*/
private void addQueue(final String[] args)
{
if (args.length < 1)
{
usage();
}
Map<String, Object> properties = new HashMap<String, Object>();
for (String a : extraArguments)
{
String[] r = a.split("=");
String value = r.length == 2 ? r[1] : null;
properties.put(r[0], value);
}
if (_durable)
{
properties.put("durable", true);
properties.put(FILECOUNT, _fileCount);
properties.put(FILESIZE, _fileSize);
}
if (_maxQueueSize > 0)
{
properties.put(MAX_QUEUE_SIZE, _maxQueueSize);
}
if (_maxQueueCount > 0)
{
properties.put(MAX_QUEUE_COUNT, _maxQueueCount);
}
if (_limitPolicy.equals("reject"))
{
properties.put(POLICY_TYPE, "reject");
}
else if (_limitPolicy.equals("flow-to-disk"))
{
properties.put(POLICY_TYPE, "flow_to_disk");
}
else if (_limitPolicy.equals("ring"))
{
properties.put(POLICY_TYPE, "ring");
}
else if (_limitPolicy.equals("ring-strict"))
{
properties.put(POLICY_TYPE, "ring_strict");
}
if (_clusterDurable)
{
properties.put(CLUSTER_DURABLE, 1l);
}
if (_order.equals("lvq"))
{
properties.put(LVQ, 1l);
}
else if (_order.equals("lvq-no-browse"))
{
properties.put(LVQNB, 1l);
}
if (_eventGeneration > 0)
{
properties.put(QUEUE_EVENT_GENERATION, _eventGeneration);
}
if (_altExchange != null)
{
properties.put("alternate-exchange", _altExchange);
}
if (_flowStopSize > 0)
{
properties.put(FLOW_STOP_SIZE, _flowStopSize);
}
if (_flowResumeSize > 0)
{
properties.put(FLOW_RESUME_SIZE, _flowResumeSize);
}
if (_flowStopCount > 0)
{
properties.put(FLOW_STOP_COUNT, _flowStopCount);
}
if (_flowResumeCount > 0)
{
properties.put(FLOW_RESUME_COUNT, _flowResumeCount);
}
QmfData arguments = new QmfData();
arguments.setValue("type", "queue");
arguments.setValue("name", args[0]);
arguments.setValue("properties", properties);
try
{
_broker.invokeMethod("create", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
// passive queue creation not implemented yet (not sure how to
do it using QMF2)
}
/**
* Remove an exchange using the QMF "delete" method.
* @param args the exchange name is the first argument.
* The remaining QMF method properties are populated form config
parsed from the command line.
*/
private void delExchange(final String[] args)
{
if (args.length < 1)
{
usage();
}
QmfData arguments = new QmfData();
arguments.setValue("type", "exchange");
arguments.setValue("name", args[0]);
try
{
_broker.invokeMethod("delete", arguments);
}
catch (QmfException e)
{
System.out.println(e.getMessage());
}
}
On 10/02/12 09:21, sinduja.rama...@gmail.com wrote:
Hi,
Can someone explain how to add and delete queues and exchanges
programatically? In qpid-0.14,queueDeclare() and exchangeDeclare() seems
to be obsolete. Please correct me if iam wrong.
Moreover we normally use qpid-config to add and delete queues and
exchanges. But how to do the same in c++ code.Can someone explain with some
small example.
Regards,
Sinduja.R
#include <iostream>
#include <string>
#include <sstream>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/exceptions.h>
#include <qpid/types/Variant.h>
#include <qpid/Msg.h>
using namespace qpid::messaging;
using namespace qpid::types;
class MethodInvoker
{
public:
MethodInvoker(Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
sender(session.createSender("qmf.default.direct/broker")),
receiver(session.createReceiver(replyTo)) {}
void createLink(const std::string& host, uint32_t port=5672, bool durable=false,
const std::string& mechanism=std::string(),
const std::string& username=std::string(),
const std::string& password=std::string(),
const std::string& transport=std::string())
{
Variant::Map params;
params["host"]=host;
params["port"]=port;
params["durable"]=durable;
if (mechanism.size()) params["authMechanism"]=mechanism;
if (username.size()) params["username"]=username;
if (password.size()) params["password"]=password;
if (transport.size()) params["transport"]=transport;
methodRequest("connect", params);
}
void createBridge(const std::string& linkName,
const std::string& source, const std::string& destination,
const std::string& key=std::string(), bool durable=false)
{
Variant::Map params;
params["durable"]=durable;
params["src"]=source;
params["dest"]=destination;
if (key.size()) params["key"]=key;
params["dynamic"]=false;
params["srcIsLocal"]=false;
params["srcIsQueue"]=false;
methodRequest("bridge", params, linkName, "link");
}
void methodRequest(const std::string& method, const Variant::Map& inParams, const std::string& objectName="amqp-broker", const std::string& objectType="broker", Variant::Map* outParams = 0)
{
Variant::Map content;
Variant::Map objectId;
std::stringstream name;
name << "org.apache.qpid.broker:" << objectType << ":" << objectName;
objectId["_object_name"] = name.str();
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = inParams;
Message request;
request.setReplyTo(replyTo);
request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
request.getProperties()["qmf.opcode"] = "_method_request";
encode(content, request);
sender.send(request);
Message response;
if (receiver.fetch(response, Duration::SECOND*5)) {
if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
std::string opcode = response.getProperties()["qmf.opcode"];
if (opcode == "_method_response") {
if (outParams) {
Variant::Map m;
decode(response, m);
*outParams = m["_arguments"].asMap();
}
} else if (opcode == "_exception") {
Variant::Map m;
decode(response, m);
throw Exception(QPID_MSG("Error: " << m["_values"]));
} else {
throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
}
} else {
throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
<< response.getProperties()["x-amqp-0-10.app-id"]));
}
} else {
throw Exception(QPID_MSG("No response received"));
}
}
private:
Address replyTo;
Sender sender;
Receiver receiver;
};
int main()
{
try
{
Connection conn("localhost:5672");
conn.open();
Session session = conn.createSession();
MethodInvoker control(session);
control.createLink("localhost", 5673, true);
control.createBridge("localhost,5673", "amq.direct", "amq.direct", "my-key", true);
conn.close();
}
catch(std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
int main(int argc, char** argv)
{
if (argc == 1) {
std::cout << "Please specify a queue name." << std::endl;
return 1;
}
std::string queue(argv[1]);
Connection c(argc > 2 ? argv[2] : "localhost");
try {
c.open();
Session session = c.createSession();
//Prior to Qpid 0.10 the response queue had to be bound to
//qmf.default.direct; now no longer needed:
//Address responses("qmf.default.direct/my-name; {node: {type: topic}}");
Address responses("#; {create: always, node: {x-declare: {auto-delete:True}}}");
Receiver r = session.createReceiver(responses);
Sender s = session.createSender("qmf.default.direct/broker");
Message request;
request.setReplyTo(responses);
request.setContentType("amqp/map");
request.setProperty("x-amqp-0-10.app-id", "qmf2");
request.setProperty("qmf.opcode", "_query_request");
Variant::Map oid;
oid["_object_name"] = std::string("org.apache.qpid.broker:queue:") + queue;
Variant::Map content;
content["_what"] = "OBJECT";
content["_object_id"] = oid;
encode(content, request);
s.send(request);
Message response = r.fetch();
Variant::List contentIn;
decode(response, contentIn);
if (contentIn.size() == 1) {
Variant::Map details = contentIn.front().asMap()["_values"].asMap();
std::cout << "Message depth for " << queue << " is " << details["msgDepth"] << std::endl;
} else if (contentIn.size() == 0) {
std::cout << "No such queue: " << queue << std::endl;
} else {
std::cout << "Unexpected number of entries: " << contentIn << std::endl;
}
session.acknowledge();
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
int main(int argc, char** argv)
{
if (argc == 1) {
std::cout << "Please specify a queue name." << std::endl;
return 1;
}
std::string queue(argv[1]);
Connection c(argc > 2 ? argv[2] : "localhost");
try {
c.open();
Session session = c.createSession();
//Prior to Qpid 0.10 the response queue had to be bound to
//qmf.default.direct; now no longer needed:
//Address responses("qmf.default.direct/my-name; {node: {type: topic}}");
Address responses("#; {create: always, node: {x-declare: {auto-delete:True}}}");
Receiver r = session.createReceiver(responses);
Sender s = session.createSender("qmf.default.direct/broker");
Message request;
request.setReplyTo(responses);
request.setContentType("amqp/map");
request.setProperty("x-amqp-0-10.app-id", "qmf2");
request.setProperty("qmf.opcode", "_method_request");
Variant::Map oid;
oid["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
Variant::Map content;
content["_object_id"] = oid;
content["_method_name"] = "delete";
Variant::Map arguments;
arguments["type"] = "queue";
arguments["name"] = queue;
content["_arguments"] = arguments;
encode(content, request);
s.send(request);
Message response = r.fetch();
if (response.getProperties()["qmf.opcode"] == "_exception") {
Variant::Map result;
decode(response, result);
std::cerr << "Error on delete: " << result["_values"].asMap()["error_text"] << std::endl;
} else if (response.getProperties()["qmf.opcode"] == "_method_response") {
std::cout << "Queue deleted." << std::endl;
} else {
std::cerr << "Unexpected opcode: " << response.getProperties()["qmf.opcode"] << std::endl;
}
session.acknowledge();
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
The broker (qpidd) is managed via specially formatted messages sent
to- and received from- special addresses. This approach can be used to
list, create and delete queues and exchanges and to bind them
together.
This approach is described as part of the Qpid Management Framework
(version 2).
Command messages are simply map messages that are sent to address
qmf.default.direct/broker (i.e. to the exchange named
'qmf.default.direct', with a routing key or subject of 'broker'). The
message should contain a reply-to address from which the sender can
receiver responses.
The map used as the content for commands follows a particular
pattern.
There must always be an entry with key _object_id whose value
is a nested map identifying the target of the command. For the
commands considered here the target is always the broker itself. Thus
the _object_id map contains a single value with key _object_name and
value org.apache.qpid.broker:broker:amqp-broker.
There are two further top-level entries in the map. One has
_method_name as the key and the name of the command as its value. The
second, with key _arguments, contains a nested map in which the named
arguments for the command are contained.
In addition to correctly formatted content, there are two message
properties that must also be set. These are 'x-amqp-0-10.app-id',
which should always have the value 'qmf2' and 'qmf.opcode' which for
commands should always have the value '_method_request'.
After we have correctly constructed a command message and sent it to
the correct address, we can wait for the response to arrive from the
reply-to address we specified.
Wehn it arrives it should also have the 'x-amqp-0-10.app-id' property
set to 'qmf2' and the 'qmf.opcode' should be '_method_response' if all
went well or '_exception' if an error was encountered. In both cases
the content is again a map. In the case of a valid response, any
return values (or 'out parameters') will be present as a nested map
against the key '_arguments'. In the case of an exception, the details
of the exception will be given in a nested map against the key
'_values'.
Given the basic mechanism described, the commands used to create and
delete queues and exchanges are named 'create' and 'delete'
respectively. The create command takes four arguments:
(1) type specifies the type of object to create and can be queue,
exchange or binding
(2) name specifies the name of the object to create
(3) properties is a nested map in which specific properties for the
object to be created can be requested
(4) the strict argument takes a boolean value which at present is
ignored, but which is intended to indicate whether the command should
fail if any unrecognised properties have been specified
The delete method takes three arguments:
(1) as for create the type specifies the type of object to be delete -
valid values are again queue, exchange or binding - and is needed here
to handle the different namespaces that each object inhabits
(2) the name identifies the object to delete
(3) the final argument is a nested map with key options that at
present is unused
The naming of queues and exchanges is simple - a queue named my-queue
would set the name argument to a string of that value. The naming of
bindings uses the pattern <exchange>/<queue>/<key>
(e.g. amq.topic/my-queue/my-key identifies a binding between my-queue
and the exchange amq.topic with the binding key my-key)
The following python code shows by example the creation of a queue
named 'my-queue' that is set to be auto-deleted after 10 seconds.
conn = Connection(opts.broker)
try:
conn.open()
ssn = conn.session()
snd = ssn.sender("qmf.default.direct/broker")
reply_to = "#; {create:always, node:{x-declare:{auto-delete:true}}}"
rcv = ssn.receiver(reply_to)
content = {
"_object_id": {"_object_name":
"org.apache.qpid.broker:broker:amqp-broker"},
"_method_name": "create",
"_arguments": {"type":"queue", "name":"my-queue",
properties:{"auto-delete":True, "qpid.auto_delete_timeout":10}}
}
request = Message(reply_to=reply_to, content=content)
request.properties["x-amqp-0-10.app-id"] = "qmf2"
request.properties["qmf.opcode"] = "_method_request"
snd.send(request)
try:
response = rcv.fetch(timeout=opts.timeout)
if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
if response.properties['qmf.opcode'] == '_method_response':
return response.content['_arguments']
elif response.properties['qmf.opcode'] == '_exception':
raise Exception("Error: %s" % response.content['_values'])
else: raise Exception("Invalid response received, unexpected opcode: %s"
% m)
else: raise Exception("Invalid response received, not a qmfv2 method: %s" %
m)
except Empty:
print "No response received!"
except Exception, e:
print e
except ReceiverError, e:
print e
except KeyboardInterrupt:
pass
conn.close()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:users-subscr...@qpid.apache.org