On 06/14/2011 07:35 AM, Jiri Krutil wrote:
Hi
Few questions regarding the (new) C++ client:
1. Is there a way how to programmatically delete queues, exchanges and
bindings from the C++ client?
As of the 0.10 release, you can do that via the map-message based
management protocol. I've attached some notes on that and a rudimentary
example of deleting via the c++ messaging API.
2. What happens if I re-declare an existing queue or exchange from the
C++ client, specifying in the address different attributes than what the
object currently has?
Generally the differences are ignored. However when using addresses you
can include an assert option to ensure you get what you asked for.
You can also create queues using the management protocol. In that case,
if the queue already exists you will get back an error message informing
you of that.
3. I am re-declaring an existing queue from the C++ client. I am
providing a list of bindings in x-bindings part of the address. What
happens if my binding list differs from the bindings that currently
exist for the queue? Will the extra bindings be added and the missing
bindings be deleted from the broker?
The extra bindings will be added, but no pre-existing bindings will be
removed.
The broker (qpidd) is managed via specially formatted messages sent
to- and received from- special addresses. This approach can be used to
list, create and delete queues and exchanges and to bind them
together.
This approach is described as part of the Qpid Management Framework
(version 2).
Command messages are simply map messages that are sent to address
qmf.default.direct/broker (i.e. to the exchange named
'qmf.default.direct', with a routing key or subject of 'broker'). The
message should contain a reply-to address from which the sender can
receiver responses.
The map used as the content for commands follows a particular
pattern.
There must always be an entry with key _object_id whose value
is a nested map identifying the target of the command. For the
commands considered here the target is always the broker itself. Thus
the _object_id map contains a single value with key _object_name and
value org.apache.qpid.broker:broker:amqp-broker.
There are two further top-level entries in the map. One has
_method_name as the key and the name of the command as its value. The
second, with key _arguments, contains a nested map in which the named
arguments for the command are contained.
In addition to correctly formatted content, there are two message
properties that must also be set. These are 'x-amqp-0-10.app-id',
which should always have the value 'qmf2' and 'qmf.opcode' which for
commands should always have the value '_method_request'.
After we have correctly constructed a command message and sent it to
the correct address, we can wait for the response to arrive from the
reply-to address we specified.
Wehn it arrives it should also have the 'x-amqp-0-10.app-id' property
set to 'qmf2' and the 'qmf.opcode' should be '_method_response' if all
went well or '_exception' if an error was encountered. In both cases
the content is again a map. In the case of a valid response, any
return values (or 'out parameters') will be present as a nested map
against the key '_arguments'. In the case of an exception, the details
of the exception will be given in a nested map against the key
'_values'.
Given the basic mechanism described, the commands used to create and
delete queues and exchanges are named 'create' and 'delete'
respectively. The create command takes four arguments:
(1) type specifies the type of object to create and can be queue,
exchange or binding
(2) name specifies the name of the object to create
(3) properties is a nested map in which specific properties for the
object to be created can be requested
(4) the strict argument takes a boolean value which at present is
ignored, but which is intended to indicate whether the command should
fail if any unrecognised properties have been specified
The delete method takes three arguments:
(1) as for create the type specifies the type of object to be delete -
valid values are again queue, exchange or binding - and is needed here
to handle the different namespaces that each object inhabits
(2) the name identifies the object to delete
(3) the final argument is a nested map with key options that at
present is unused
The naming of queues and exchanges is simple - a queue named my-queue
would set the name argument to a string of that value. The naming of
bindings uses the pattern <exchange>/<queue>/<key>
(e.g. amq.topic/my-queue/my-key identifies a binding between my-queue
and the exchange amq.topic with the binding key my-key)
The following python code shows by example the creation of a queue
named 'my-queue' that is set to be auto-deleted after 10 seconds.
conn = Connection(opts.broker)
try:
conn.open()
ssn = conn.session()
snd = ssn.sender("qmf.default.direct/broker")
reply_to = "#; {create:always, node:{x-declare:{auto-delete:true}}}"
rcv = ssn.receiver(reply_to)
content = {
"_object_id": {"_object_name":
"org.apache.qpid.broker:broker:amqp-broker"},
"_method_name": "create",
"_arguments": {"type":"queue", "name":"my-queue",
properties:{"auto-delete":True, "qpid.auto_delete_timeout":10}}
}
request = Message(reply_to=reply_to, content=content)
request.properties["x-amqp-0-10.app-id"] = "qmf2"
request.properties["qmf.opcode"] = "_method_request"
snd.send(request)
try:
response = rcv.fetch(timeout=opts.timeout)
if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
if response.properties['qmf.opcode'] == '_method_response':
return response.content['_arguments']
elif response.properties['qmf.opcode'] == '_exception':
raise Exception("Error: %s" % response.content['_values'])
else: raise Exception("Invalid response received, unexpected opcode: %s"
% m)
else: raise Exception("Invalid response received, not a qmfv2 method: %s" %
m)
except Empty:
print "No response received!"
except Exception, e:
print e
except ReceiverError, e:
print e
except KeyboardInterrupt:
pass
conn.close()
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
int main(int argc, char** argv)
{
if (argc == 1) {
std::cout << "Please specify a queue name." << std::endl;
return 1;
}
std::string queue(argv[1]);
Connection c(argc > 2 ? argv[2] : "localhost");
try {
c.open();
Session session = c.createSession();
//Prior to Qpid 0.10 the response queue had to be bound to
//qmf.default.direct; now no longer needed:
//Address responses("qmf.default.direct/my-name; {node: {type: topic}}");
Address responses("#; {create: always, node: {x-declare: {auto-delete:True}}}");
Receiver r = session.createReceiver(responses);
Sender s = session.createSender("qmf.default.direct/broker");
Message request;
request.setReplyTo(responses);
request.setContentType("amqp/map");
request.setProperty("x-amqp-0-10.app-id", "qmf2");
request.setProperty("qmf.opcode", "_method_request");
Variant::Map oid;
oid["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
Variant::Map content;
content["_object_id"] = oid;
content["_method_name"] = "delete";
Variant::Map arguments;
arguments["type"] = "queue";
arguments["name"] = queue;
content["_arguments"] = arguments;
encode(content, request);
s.send(request);
Message response = r.fetch();
if (response.getProperties()["qmf.opcode"] == "_exception") {
Variant::Map result;
decode(response, result);
std::cerr << "Error on delete: " << result["_values"].asMap()["error_text"] << std::endl;
} else if (response.getProperties()["qmf.opcode"] == "_method_response") {
std::cout << "Queue deleted." << std::endl;
} else {
std::cerr << "Unexpected opcode: " << response.getProperties()["qmf.opcode"] << std::endl;
}
session.acknowledge();
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]