Hi,
Long post follows, hope it's useful....
Cheers,
Frase
IIRC C++ brokers from 0.10 onwards have both QMF1 and QMF2 enabled by
default, the 0.8 broker needs to have it explicitly enabled with
"--mgmt-qmf2 yes"
if you do qpidd -h you'll see all the options.
Re "where can I get the list of messages that I could send to broker for
configurations/stats?" the best place to look is here:
https://cwiki.apache.org/qpid/qmf-map-message-protocol.html
The QMF2 API is specified here:
https://cwiki.apache.org/qpid/qmfv2-api-proposal.html
The latter is what I've just finished writing a Java implementation for.
My implementation contains "the full works" including all the
subscription stuff. I keep promising to release it but unfortunately I
keep getting sidetracked - the other week I got bogged down trying to
build the 0.12 broker 'cause I wanted to test against that...
I'm nearly there, the main thing holding me up at the moment is that I'm
writing a bunch of Java QMF2 demo tools, so I've written Java ports of
qpid-config, qpid-printevents and qpid-queue-stats (that was written to
demo QMF2 subscriptions) I've also written a port of qpid-ctrl so you
can send QMF2 methods to the broker from the command line. I'm currently
working on qpid-fuse, which is a demo that intercepts the
queueThresholdExceeded Event and remove bindings from the queue to
protect producers. It's nearly there, but not quite.
I really ought to finish it this weekend or next (the day job means I
can only really do this at weekends :-( ).
How often do you need to find the number of messages in your queue BTW?
I'm not aware of a way to find the number of messages in a queue other
than via QMF (you could *possibly* browse the queue, but IIRC the JMS
browse API doesn't give size so I think you'd have to iterate and count,
which isn't efficient).
With QMF2 there's two options, one is to subscribe to data indications
from the broker (this is actually what I do to emulate QMF2 query
subscriptions on the ManagementAgent as that doesn't currently support
real query subscriptions). The problem with that (and why I asked how
often you need to find the number of messages) is that the broker only
pushes data every 10 seconds (though you can change this via a config
option). The other option is to use the QMF2 "_query_request" described
earlier in this thread to "poll" the issues with that is that for a
*specific* queue you'd need to first retrieve the list of all queues
using the _schema_id query then find the queue you care about by
iterating and looking up the "name" property. However once you've found
the queue object that you want you can continually get updates by doing
an _object_id query.
_object_id queries are "fairly" efficient as you only get a response for
the object that you want also on the broker side it's a Map lookup keyed
by _object_id (the _schema_id lookups iterate to find matching objects).
However I say "fairly" efficient as it's still a message request and
response over the network to get the info from the broker so clearly
that's something to bear in mind. In the QMF2 API the procedure I've
described is quite simple - you just need to invoke the "refresh()"
method on the QmfConsoleData object that represents the queue you care
about.
My code for this is as follows (this code is actually delegated to from
my QMFConsoleData class cause it's my Console class that does all the
JMS stuff):
The following should have most of the stuff that you need in lieu of me
finally actually publishing the API. Clearly it's a bit disjoint. The
_reply_address and _async_reply_address etc are created using the
following (just snippets) - pretty standard JMS really. I use two
sessions so I can mix synchronous calls with async stuff from my
MessageListener thread safely.
.........
{
String topicBase = "qmf." + _domain + ".topic";
_asyncSession = _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
_syncSession = _connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Create a MessageProducer for the QMF topic address used
to broadcast requests
Destination topicAddress = _syncSession.createQueue(topicBase);
_broadcaster = _syncSession.createProducer(topicAddress);
// Set up MessageListener on the Event Address
Destination eventAddress =
_asyncSession.createQueue(topicBase + "/agent.ind.#");
_eventConsumer = _asyncSession.createConsumer(eventAddress);
_eventConsumer.setMessageListener(this);
// Create the asynchronous JMSReplyTo _replyAddress and
MessageConsumer
_asyncReplyAddress = _asyncSession.createQueue(_address +
".async");
_asyncResponder =
_asyncSession.createConsumer(_asyncReplyAddress);
_asyncResponder.setMessageListener(this);
// I've extended the synchronized block to include creating
the _requester and _responder. I don't believe
// that this is strictly necessary, but it stops findbugs
moaning about inconsistent synchronization
// so makes sense if only to get that warm and fuzzy feeling
of keeping findbugs happy :-)
synchronized(this)
{
// Create a MessageProducer for the QMF direct address,
mainly used for request/response
Destination directAddress =
_syncSession.createQueue("qmf." + _domain + ".direct");
_requester = _syncSession.createProducer(directAddress);
// Create the JMSReplyTo _replyAddress and MessageConsumer
_replyAddress = _syncSession.createQueue(_address);
_responder = _syncSession.createConsumer(_replyAddress);
_connection.start();
.............
/**
* Request that the Agent update the value of an object's contents.
*
* @param agent the Agent to get the refresh from.
* @param objectId the ObjectId being queried for
* @param replyHandle the correlation handle used to tie
asynchronous method requests with responses
* @param timeout the time to wait for a reply from the Agent, a
value of -1 means use the default timeout
* @return the refreshed object
*/
public QmfConsoleData refresh(final Agent agent, final ObjectId
objectId, final String replyHandle, final int timeout)
{
List<QmfConsoleData> objects = getObjects(agent, objectId,
replyHandle, timeout);
return (objects.size() == 0) ? null : objects.get(0);
}
/**
* Perform a query for QmfConsoleData objects. Returns a list
(possibly empty) of matching objects.
* If replyHandle is null this method will block until the agent
replies, or the timeout expires.
* Once the timeout expires, all data retrieved to date is returned.
If replyHandle is non-null an
* asynchronous request is performed
*
* @param agent the Agent being queried
* @param query the ObjectId or SchemaClassId being queried for.
* @param replyHandle the correlation handle used to tie
asynchronous method requests with responses
* @param timeout the time to wait for a reply from the Agent, a
value of -1 means use the default timeout
* @return a List of QMF Objects describing that class
*/
private List<QmfConsoleData> getObjects(final Agent agent, final
QmfData query,
final String replyHandle,
int timeout)
{
String agentName = agent.getName();
timeout = (timeout < 1) ? _replyTimeout : timeout;
List<QmfConsoleData> results = Collections.emptyList();
try
{
Destination destination = (replyHandle == null) ?
_replyAddress : _asyncReplyAddress;
MapMessage request = _syncSession.createMapMessage();
request.setJMSReplyTo(destination);
request.setJMSCorrelationID(replyHandle);
request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
request.setStringProperty("method", "request");
request.setStringProperty("qmf.opcode", "_query_request");
request.setStringProperty("qpid.subject", agentName);
// Create a QMF Query for an "OBJECT" target using either a
schema ID or object ID
String queryType = (query instanceof SchemaClassId) ?
"_schema_id" : "_object_id";
request.setObject("_what", "OBJECT");
request.setObject(queryType, query.mapEncode());
// Wrap request & response in synchronized block in case any
other threads invoke a request
// it would be somewhat unfortunate if their response got
interleaved with ours!!
synchronized(this)
{
_requester.send(request);
if (replyHandle == null)
{
Message response = _responder.receive(timeout*1000);
if (response == null)
{
log.info("No response received in getObjects()");
return Collections.emptyList();
}
if (response instanceof MapMessage)
{ // Error responses are returned as MapMessages
//QmfData exception = new
QmfData(AMQPMessage.getMap(response));
//System.out.println(agentName + " " +
exception.getStringValue("error_text"));
}
else
{
List<Map> mapResults =
AMQPMessage.getList(response);
results = new
ArrayList<QmfConsoleData>(mapResults.size());
for (Map content : mapResults)
{
results.add(new QmfConsoleData(content, agent));
}
}
}
}
}
catch (JMSException jmse)
{
log.info("JMSException {} caught in getObjects()",
jmse.getMessage());
}
return results;
}
/**
* JMS QMF returns amqp/list types as a BytesMessage this method
decodes that into a java.util.List
* <p>
* Taken from Gordon Sim's initial JMS QMF Example using the BBDecoder
* <p>
* Trivia: This block of code from Gordon Sim is the seed that
spawned the whole of this Java QMF2 API
* implementation - cheers Gordon.
*
* @param message amqp/list encoded JMS Message
* @return a java.util.List decoded from Message
*/
@SuppressWarnings("unchecked")
public static <T> List<T> getList(final Message message) throws
JMSException
{
if (message == null)
{
throw new MessageFormatException("Attempting to do
AMQPMessage.getList() on null Message");
}
else if (message instanceof BytesMessage)
{
BytesMessage msg = (BytesMessage)message;
//only handles responses up to 2^31-1 bytes long
byte[] data = new byte[(int) msg.getBodyLength()];
msg.readBytes(data);
BBDecoder decoder = new BBDecoder();
decoder.init(ByteBuffer.wrap(data));
return (List<T>)decoder.readList();
}
else
{
return null;
}
}
dmounessa wrote:
Thanks -- do I need to start the broker with any special module (qmf?) or
does the standard C++ broker will support the QMF messages?
Also, where can I get the list of messages that I could send to broker for
configurations/stats?
Thanks for your help?
--
View this message in context:
http://apache-qpid-users.2158936.n2.nabble.com/Jave-Client-with-C-Broker-Messages-in-Queue-size-tp7055301p7067715.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]