On 05/19/2011 02:51 PM, Phil Brown wrote:
I am fairly new to qpid and trying to learn the ropes.
From what I understand, there is no way, through the C++ Messaging API, to
get the number of messages in a queue. How can this be done? Is there any
sample code?
Any help would be greatly appreciated!!!
Querying and managing the broker is achieved by sending specially
formatted map messages to a special address and then handling the
response(s) if necessary.
The attached example shows a simple query of a queue by name. Only the
message depth is printed but there a range of other stats and properties
in the response.
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
int main(int argc, char** argv)
{
if (argc == 1) {
std::cout << "Please specify a queue name." << std::endl;
return 1;
}
std::string queue(argv[1]);
Connection c(argc > 2 ? argv[2] : "localhost");
try {
c.open();
Session session = c.createSession();
//Prior to Qpid 0.10 the response queue had to be bound to
//qmf.default.direct; now no longer needed:
//Address responses("qmf.default.direct/my-name; {node: {type: topic}}");
Address responses("#; {create: always, node: {x-declare: {auto-delete:True}}}");
Receiver r = session.createReceiver(responses);
Sender s = session.createSender("qmf.default.direct/broker");
Message request;
request.setReplyTo(responses);
request.setContentType("amqp/map");
request.setProperty("x-amqp-0-10.app-id", "qmf2");
request.setProperty("qmf.opcode", "_query_request");
Variant::Map oid;
oid["_object_name"] = std::string("org.apache.qpid.broker:queue:") + queue;
Variant::Map content;
content["_what"] = "OBJECT";
content["_object_id"] = oid;
encode(content, request);
s.send(request);
Message response = r.fetch();
Variant::List contentIn;
decode(response, contentIn);
if (contentIn.size() == 1) {
Variant::Map details = contentIn.front().asMap()["_values"].asMap();
std::cout << "Message depth for " << queue << " is " << details["msgDepth"] << std::endl;
} else if (contentIn.size() == 0) {
std::cout << "No such queue: " << queue << std::endl;
} else {
std::cout << "Unexpected number of entries: " << contentIn << std::endl;
}
session.acknowledge();
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]