On 11/01/13 13:11, Lance D. wrote:
Wow. Thanks for the huge amount of detail!
No worries, hope it helps. One of the best things about Qpid is the
strong user community, I've got a lot out of it so it's nice to be able
to give a little back.
You are correct: I am using the C++ broker. As far as language for my
tool, I'm pretty agnostic; I want a tool that suits my needs and I've got
the resources to do it in any viable language (though I am partial to C++).
Unfortunately C++ has the poorest QMF support IMHO :-( There is an API,
but it bears no resemblance to the QMF2 API that I linked previously :-(
I've not done much with that I'm afraid. I keep promising myself that
I'll port my Java implementation to C++, but until I get more feedback
on my Java stuff it's hard to find the motivation.....
The is an example using the C++ QMF API in
qpid/cpp/bindings/qmf2/examples/cpp but as I say I'm not overly familiar
with that API and haven't used it myself.
I've attached a few examples that Gordon Sim put together that use the
QMF2 protocol directly from C++ which you might find useful.
Java and python currently have the best QMF support, though as I said
previously I'll shortly be releasing a REST API for QMF2 and also
JavaScript support :-)
I'm going to follow up on one paragraph in particular:
"To more directly answer your question "However, I’m having a hard time
making that same mapping between the connection stuff and a sender to an
exchange. " yeah there's some nuance there :-) the main thing is to think
in terms of UML associations, so for example a binding object has queueRef
and exchangeRef properties which hold ObjectIds that enable the associated
queue and exchange to be indexed (binding behaves like a UML association
class really) what this means though if you want to work out how a queue
links to an exchange you have to do it from the perspective of the binding."
My problem is that my publishers establish a 'send only' connection, so
they link to the exchange directly (without a queue). To put it another
way, if I have n publishers sending to exchange 'myExchange', when I run
'qpid-stat -q' I don't see n queues bound to myExchange. So, without a
queueRef, I have no binding object for those publishers, and no way to get,
for example, the remote PID. Am I missing something?
Ahhh so what I think you're saying is that you want to figure out
linkage from producer connections.
Unfortunately I don't think that you're missing something. So as I said
previously it's possible (though not trivial) to work out associations
between consumer connections and the queues that they are getting their
data from, but there isn't any good way to do what might reasonably be
considered the converse which is to work out the exchange that a
producer connection is publishing to.
I asked about that *ages* ago - I might be able to find the post in my
sent box but it'll probably take a while. IIRC the response that I got
from Gordon Sim was that it related to the AMQP spec and that there
wasn't a mechanism to do this.
*So basically I don't think that there's any way to look up an exchange
associated with a connection.*
The closest that I've got to that is to at least flag up that a
connection relates to a "producer only" client by checking for
connections that have sessions but no subscribers.
The remotePid is a property of the connection management object.
if you only have producer clients (isn't that a bit useless with nothing
consuming the messages??) all you will see in that part of the graph is
connections and sessions.
I wish that I could give a happier answer, but I'm close to certain that
you aren't missing something. I've had users ask for this sort of thing
myself and have had to say it's not possible, I've not even worked out a
reliable way to even *infer* the exchange. As I say the closest I got
was to at least work out which of my connections only related to
producers so I could work out the remote host and pid of those.
Perhaps Gordon or one of the others on the core Qpid team has some more
insight, but I suspect I've probably done more with QMF than most.
Sorry I can't give the answer you'd like,
Frase
Thanks again,
-Lance
On Fri, Jan 11, 2013 at 5:44 AM, Fraser Adams <[email protected]
wrote:
Howdy Lance,
You mention qpid-tool which uses QMF so I'm assuming that you are talking
about the C++ broker. The Java broker uses JMX and I'm not so familiar with
how that does instrumentation so the following is only really applicable to
the C++ broker.
Unfortunately QMF can be a bit non-trivial, it's not helped by the fact
that there are two variants - QMF1 which is a binary protocol and
accompanying API and QMF2 which is built on top of Map messages.
The python tools bundled with qpid used to use QMF1, though I *think* that
as of qpid 0.18 they've started to use QMF2 (trying 0.18 qpid-config on a
0.8 broker gets a method failed error which suggests it's using QMF2 create
method - though I've not actually looked at the 0.18 source).
If you are writing your tools in python there's a whole lot more stuff
available for QMF.
If you are writing in Java a year back I wrote a complete Java
implementation of the QMF2 API:
https://cwiki.apache.org/qpid/**qmfv2-api-proposal.html<https://cwiki.apache.org/qpid/qmfv2-api-proposal.html>
based on the protocol:
https://cwiki.apache.org/qpid/**qmf-map-message-protocol.html<https://cwiki.apache.org/qpid/qmf-map-message-protocol.html>
The link to my code is here:
https://issues.apache.org/**jira/browse/QPID-3675<https://issues.apache.org/jira/browse/QPID-3675>
Slightly frustratingly there hasn't been much feedback on this - I'd
really like to see it make it into the main code base - I put a huge amount
of effort into it. The code in that Jira link also contains tests/tools
that illustrate how to use QMF2 in Java in some non-trivial scenarios.
As it happens in a few days time I'm about to release a major uplift to
that, which will include a QMF2 REST API and most significantly a complete
HTML5 based web UI that will enable inspection of all QMF properties and
also queue/exchange/binding adding/deletion so that may be just the sort of
thing you're interested in - it's pretty close, I'm just doing some last
minute de-snagging trying to get it looking nice on small real-estate
mobile browsers.
To more directly answer your question "However, I’m having a hard time
making that same mapping between the connection stuff and a sender to an
exchange. " yeah there's some nuance there :-) the main thing is to think
in terms of UML associations, so for example a binding object has queueRef
and exchangeRef properties which hold ObjectIds that enable the associated
queue and exchange to be indexed (binding behaves like a UML association
class really) what this means though if you want to work out how a queue
links to an exchange you have to do it from the perspective of the binding.
Similarly it's quite involved to find the subscriptions associated with a
queue because the navigation goes "the wrong way" that is to say
subscription has a queueRef property.
What I've found is that in general the best (and by far in a way most
efficient) way to do non-trivial QMF stuff is to to the getObject() stuff
once for each of the objects that you care about then dereference those
objects into Maps indexed by objectId - if you do that then you can index
everything you want easily and cheaply.
If you look at the code for things like qpid-config it's pretty easy to
get into the trap of doing getObjects() calls on inner loops, which is
heinously inefficient :-) because each getObjects() call does a messaging
request/response under the hood.
To get you up and running and give a bit of insight (I hope!!) I've
attached a couple of python programs I wrote last year (these use QMF1)
connection-audit checks when connections are made and looks up the queues
being subscribed to against a whitelist, basically if the queue being
subscribed to is not a queue that's in the whitelist it generates a log
message.
connection-audit-dom is the same but uses DOM based XML parsing (I needed
to do that when I had to use it against an oldish python version)
connection-logger does a lot of what I think you care about, basically it
logs all connections to a broker and provides useful info on them a la
qpid-config -b queues.
connection-logger-orig was my original version of this which was somewhat
erm "rushed out" :-) at face value it looks OK for a few queues etc. but if
you ramp up a load of connections - well try and add a couple of hundred
queues and you'll see what I meant by heinously inefficient above :-D.
connection-logger and connection-logger-orig are functionally equivalent
so it's worth looking through both to give a bit of insight into the right
way and wrong way to go about things.
I'm not a python programmer by any stretch of the imagination, but I think
that they are at least fair examples.
Keep an eye out for my QMF2/UI update over the next few days but hopefully
this response has given you a decent leg-up.
Best regards,
Frase
On 11/01/13 02:24, Lance D. wrote:
Hello all.
I’m in the process of building QPID inspection tools for a project I’m
working on. Now, I’m looking at the results of the qpid-tool. I see that
the connection schema has a bunch of useful info (like remote pid,
process,
etc). I’ve also found ways to link that info up with the queues to figure
out who’s subscribed to what. However, I’m having a hard time making that
same mapping between the connection stuff and a sender to an exchange.
My question is, is there a way to do that mapping (either with the
existing
tools, or by tapping into the qmf API?
Thanks,
-Lance
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
#include <iostream>
#include <string>
#include <sstream>
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Session.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/exceptions.h>
#include <qpid/types/Variant.h>
#include <qpid/Msg.h>
using namespace qpid::messaging;
using namespace qpid::types;
class MethodInvoker
{
public:
MethodInvoker(Session& session) : replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
sender(session.createSender("qmf.default.direct/broker")),
receiver(session.createReceiver(replyTo)) {}
void createLink(const std::string& host, uint32_t port=5672, bool durable=false,
const std::string& mechanism=std::string(),
const std::string& username=std::string(),
const std::string& password=std::string(),
const std::string& transport=std::string())
{
Variant::Map params;
params["host"]=host;
params["port"]=port;
params["durable"]=durable;
if (mechanism.size()) params["authMechanism"]=mechanism;
if (username.size()) params["username"]=username;
if (password.size()) params["password"]=password;
if (transport.size()) params["transport"]=transport;
methodRequest("connect", params);
}
void createBridge(const std::string& linkName,
const std::string& source, const std::string& destination,
const std::string& key=std::string(), bool durable=false)
{
Variant::Map params;
params["durable"]=durable;
params["src"]=source;
params["dest"]=destination;
if (key.size()) params["key"]=key;
params["dynamic"]=false;
params["srcIsLocal"]=false;
params["srcIsQueue"]=false;
methodRequest("bridge", params, linkName, "link");
}
void methodRequest(const std::string& method, const Variant::Map& inParams, const std::string& objectName="amqp-broker", const std::string& objectType="broker", Variant::Map* outParams = 0)
{
Variant::Map content;
Variant::Map objectId;
std::stringstream name;
name << "org.apache.qpid.broker:" << objectType << ":" << objectName;
objectId["_object_name"] = name.str();
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = inParams;
Message request;
request.setReplyTo(replyTo);
request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
request.getProperties()["qmf.opcode"] = "_method_request";
encode(content, request);
sender.send(request);
Message response;
if (receiver.fetch(response, Duration::SECOND*5)) {
if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
std::string opcode = response.getProperties()["qmf.opcode"];
if (opcode == "_method_response") {
if (outParams) {
Variant::Map m;
decode(response, m);
*outParams = m["_arguments"].asMap();
}
} else if (opcode == "_exception") {
Variant::Map m;
decode(response, m);
throw Exception(QPID_MSG("Error: " << m["_values"]));
} else {
throw Exception(QPID_MSG("Invalid response received, unexpected opcode: " << opcode));
}
} else {
throw Exception(QPID_MSG("Invalid response received, not a qmfv2 message: app-id="
<< response.getProperties()["x-amqp-0-10.app-id"]));
}
} else {
throw Exception(QPID_MSG("No response received"));
}
}
private:
Address replyTo;
Sender sender;
Receiver receiver;
};
int main()
{
try
{
Connection conn("localhost:5672");
conn.open();
Session session = conn.createSession();
MethodInvoker control(session);
control.createLink("localhost", 5673, true);
control.createBridge("localhost,5673", "amq.direct", "amq.direct", "my-key", true);
conn.close();
}
catch(std::exception& e)
{
std::cerr << e.what() << std::endl;
}
return 0;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
int main(int argc, char** argv)
{
if (argc == 1) {
std::cout << "Please specify a queue name." << std::endl;
return 1;
}
std::string queue(argv[1]);
Connection c(argc > 2 ? argv[2] : "localhost");
try {
c.open();
Session session = c.createSession();
//Prior to Qpid 0.10 the response queue had to be bound to
//qmf.default.direct; now no longer needed:
//Address responses("qmf.default.direct/my-name; {node: {type: topic}}");
Address responses("#; {create: always, node: {x-declare: {auto-delete:True}}}");
Receiver r = session.createReceiver(responses);
Sender s = session.createSender("qmf.default.direct/broker");
Message request;
request.setReplyTo(responses);
request.setContentType("amqp/map");
request.setProperty("x-amqp-0-10.app-id", "qmf2");
request.setProperty("qmf.opcode", "_query_request");
Variant::Map oid;
oid["_object_name"] = std::string("org.apache.qpid.broker:queue:") + queue;
Variant::Map content;
content["_what"] = "OBJECT";
content["_object_id"] = oid;
encode(content, request);
s.send(request);
Message response = r.fetch();
Variant::List contentIn;
decode(response, contentIn);
if (contentIn.size() == 1) {
Variant::Map details = contentIn.front().asMap()["_values"].asMap();
std::cout << "Message depth for " << queue << " is " << details["msgDepth"] << std::endl;
} else if (contentIn.size() == 0) {
std::cout << "No such queue: " << queue << std::endl;
} else {
std::cout << "Unexpected number of entries: " << contentIn << std::endl;
}
session.acknowledge();
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
int main(int argc, char** argv)
{
if (argc == 1) {
std::cout << "Please specify a queue name." << std::endl;
return 1;
}
std::string queue(argv[1]);
Connection c(argc > 2 ? argv[2] : "localhost");
try {
c.open();
Session session = c.createSession();
//Prior to Qpid 0.10 the response queue had to be bound to
//qmf.default.direct; now no longer needed:
//Address responses("qmf.default.direct/my-name; {node: {type: topic}}");
Address responses("#; {create: always, node: {x-declare: {auto-delete:True}}}");
Receiver r = session.createReceiver(responses);
Sender s = session.createSender("qmf.default.direct/broker");
Message request;
request.setReplyTo(responses);
request.setContentType("amqp/map");
request.setProperty("x-amqp-0-10.app-id", "qmf2");
request.setProperty("qmf.opcode", "_method_request");
Variant::Map oid;
oid["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
Variant::Map content;
content["_object_id"] = oid;
content["_method_name"] = "delete";
Variant::Map arguments;
arguments["type"] = "queue";
arguments["name"] = queue;
content["_arguments"] = arguments;
encode(content, request);
s.send(request);
Message response = r.fetch();
if (response.getProperties()["qmf.opcode"] == "_exception") {
Variant::Map result;
decode(response, result);
std::cerr << "Error on delete: " << result["_values"].asMap()["error_text"] << std::endl;
} else if (response.getProperties()["qmf.opcode"] == "_method_response") {
std::cout << "Queue deleted." << std::endl;
} else {
std::cerr << "Unexpected opcode: " << response.getProperties()["qmf.opcode"] << std::endl;
}
session.acknowledge();
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
The broker (qpidd) is managed via specially formatted messages sent
to- and received from- special addresses. This approach can be used to
list, create and delete queues and exchanges and to bind them
together.
This approach is described as part of the Qpid Management Framework
(version 2).
Command messages are simply map messages that are sent to address
qmf.default.direct/broker (i.e. to the exchange named
'qmf.default.direct', with a routing key or subject of 'broker'). The
message should contain a reply-to address from which the sender can
receiver responses.
The map used as the content for commands follows a particular
pattern.
There must always be an entry with key _object_id whose value
is a nested map identifying the target of the command. For the
commands considered here the target is always the broker itself. Thus
the _object_id map contains a single value with key _object_name and
value org.apache.qpid.broker:broker:amqp-broker.
There are two further top-level entries in the map. One has
_method_name as the key and the name of the command as its value. The
second, with key _arguments, contains a nested map in which the named
arguments for the command are contained.
In addition to correctly formatted content, there are two message
properties that must also be set. These are 'x-amqp-0-10.app-id',
which should always have the value 'qmf2' and 'qmf.opcode' which for
commands should always have the value '_method_request'.
After we have correctly constructed a command message and sent it to
the correct address, we can wait for the response to arrive from the
reply-to address we specified.
Wehn it arrives it should also have the 'x-amqp-0-10.app-id' property
set to 'qmf2' and the 'qmf.opcode' should be '_method_response' if all
went well or '_exception' if an error was encountered. In both cases
the content is again a map. In the case of a valid response, any
return values (or 'out parameters') will be present as a nested map
against the key '_arguments'. In the case of an exception, the details
of the exception will be given in a nested map against the key
'_values'.
Given the basic mechanism described, the commands used to create and
delete queues and exchanges are named 'create' and 'delete'
respectively. The create command takes four arguments:
(1) type specifies the type of object to create and can be queue,
exchange or binding
(2) name specifies the name of the object to create
(3) properties is a nested map in which specific properties for the
object to be created can be requested
(4) the strict argument takes a boolean value which at present is
ignored, but which is intended to indicate whether the command should
fail if any unrecognised properties have been specified
The delete method takes three arguments:
(1) as for create the type specifies the type of object to be delete -
valid values are again queue, exchange or binding - and is needed here
to handle the different namespaces that each object inhabits
(2) the name identifies the object to delete
(3) the final argument is a nested map with key options that at
present is unused
The naming of queues and exchanges is simple - a queue named my-queue
would set the name argument to a string of that value. The naming of
bindings uses the pattern <exchange>/<queue>/<key>
(e.g. amq.topic/my-queue/my-key identifies a binding between my-queue
and the exchange amq.topic with the binding key my-key)
The following python code shows by example the creation of a queue
named 'my-queue' that is set to be auto-deleted after 10 seconds.
conn = Connection(opts.broker)
try:
conn.open()
ssn = conn.session()
snd = ssn.sender("qmf.default.direct/broker")
reply_to = "#; {create:always, node:{x-declare:{auto-delete:true}}}"
rcv = ssn.receiver(reply_to)
content = {
"_object_id": {"_object_name":
"org.apache.qpid.broker:broker:amqp-broker"},
"_method_name": "create",
"_arguments": {"type":"queue", "name":"my-queue",
properties:{"auto-delete":True, "qpid.auto_delete_timeout":10}}
}
request = Message(reply_to=reply_to, content=content)
request.properties["x-amqp-0-10.app-id"] = "qmf2"
request.properties["qmf.opcode"] = "_method_request"
snd.send(request)
try:
response = rcv.fetch(timeout=opts.timeout)
if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
if response.properties['qmf.opcode'] == '_method_response':
return response.content['_arguments']
elif response.properties['qmf.opcode'] == '_exception':
raise Exception("Error: %s" % response.content['_values'])
else: raise Exception("Invalid response received, unexpected opcode: %s"
% m)
else: raise Exception("Invalid response received, not a qmfv2 method: %s" %
m)
except Empty:
print "No response received!"
except Exception, e:
print e
except ReceiverError, e:
print e
except KeyboardInterrupt:
pass
conn.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]