Hi,
Long post follows, hope it's useful....

Cheers,
Frase


IIRC C++ brokers from 0.10 onwards have both QMF1 and QMF2 enabled by default, the 0.8 broker needs to have it explicitly enabled with "--mgmt-qmf2 yes"

if you do qpidd -h you'll see all the options.

Re "where can I get the list of messages that I could send to broker for configurations/stats?" the best place to look is here: https://cwiki.apache.org/qpid/qmf-map-message-protocol.html

The QMF2 API is specified here: https://cwiki.apache.org/qpid/qmfv2-api-proposal.html

The latter is what I've just finished writing a Java implementation for. My implementation contains "the full works" including all the subscription stuff. I keep promising to release it but unfortunately I keep getting sidetracked - the other week I got bogged down trying to build the 0.12 broker 'cause I wanted to test against that...

I'm nearly there, the main thing holding me up at the moment is that I'm writing a bunch of Java QMF2 demo tools, so I've written Java ports of qpid-config, qpid-printevents and qpid-queue-stats (that was written to demo QMF2 subscriptions) I've also written a port of qpid-ctrl so you can send QMF2 methods to the broker from the command line. I'm currently working on qpid-fuse, which is a demo that intercepts the queueThresholdExceeded Event and remove bindings from the queue to protect producers. It's nearly there, but not quite.

I really ought to finish it this weekend or next (the day job means I can only really do this at weekends :-( ).


How often do you need to find the number of messages in your queue BTW?

I'm not aware of a way to find the number of messages in a queue other than via QMF (you could *possibly* browse the queue, but IIRC the JMS browse API doesn't give size so I think you'd have to iterate and count, which isn't efficient).

With QMF2 there's two options, one is to subscribe to data indications from the broker (this is actually what I do to emulate QMF2 query subscriptions on the ManagementAgent as that doesn't currently support real query subscriptions). The problem with that (and why I asked how often you need to find the number of messages) is that the broker only pushes data every 10 seconds (though you can change this via a config option). The other option is to use the QMF2 "_query_request" described earlier in this thread to "poll" the issues with that is that for a *specific* queue you'd need to first retrieve the list of all queues using the _schema_id query then find the queue you care about by iterating and looking up the "name" property. However once you've found the queue object that you want you can continually get updates by doing an _object_id query.

_object_id queries are "fairly" efficient as you only get a response for the object that you want also on the broker side it's a Map lookup keyed by _object_id (the _schema_id lookups iterate to find matching objects). However I say "fairly" efficient as it's still a message request and response over the network to get the info from the broker so clearly that's something to bear in mind. In the QMF2 API the procedure I've described is quite simple - you just need to invoke the "refresh()" method on the QmfConsoleData object that represents the queue you care about.


My code for this is as follows (this code is actually delegated to from my QMFConsoleData class cause it's my Console class that does all the JMS stuff):

The following should have most of the stuff that you need in lieu of me finally actually publishing the API. Clearly it's a bit disjoint. The _reply_address and _async_reply_address etc are created using the following (just snippets) - pretty standard JMS really. I use two sessions so I can mix synchronous calls with async stuff from my MessageListener thread safely.

.........
       {
           String topicBase  = "qmf." + _domain + ".topic";

_asyncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE); _syncSession = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// Create a MessageProducer for the QMF topic address used to broadcast requests
           Destination topicAddress = _syncSession.createQueue(topicBase);
           _broadcaster = _syncSession.createProducer(topicAddress);

           // Set up MessageListener on the Event Address
Destination eventAddress = _asyncSession.createQueue(topicBase + "/agent.ind.#");
           _eventConsumer = _asyncSession.createConsumer(eventAddress);
           _eventConsumer.setMessageListener(this);

// Create the asynchronous JMSReplyTo _replyAddress and MessageConsumer _asyncReplyAddress = _asyncSession.createQueue(_address + ".async"); _asyncResponder = _asyncSession.createConsumer(_asyncReplyAddress);
           _asyncResponder.setMessageListener(this);

// I've extended the synchronized block to include creating the _requester and _responder. I don't believe // that this is strictly necessary, but it stops findbugs moaning about inconsistent synchronization // so makes sense if only to get that warm and fuzzy feeling of keeping findbugs happy :-)
           synchronized(this)
           {
// Create a MessageProducer for the QMF direct address, mainly used for request/response Destination directAddress = _syncSession.createQueue("qmf." + _domain + ".direct");
               _requester = _syncSession.createProducer(directAddress);

               // Create the JMSReplyTo _replyAddress and MessageConsumer
               _replyAddress = _syncSession.createQueue(_address);
               _responder = _syncSession.createConsumer(_replyAddress);

               _connection.start();

.............







   /**
    * Request that the Agent update the value of an object's contents.
    *
    * @param agent the Agent to get the refresh from.
    * @param objectId the ObjectId being queried for
* @param replyHandle the correlation handle used to tie asynchronous method requests with responses * @param timeout the time to wait for a reply from the Agent, a value of -1 means use the default timeout
    * @return the refreshed object
    */
public QmfConsoleData refresh(final Agent agent, final ObjectId objectId, final String replyHandle, final int timeout)
   {
List<QmfConsoleData> objects = getObjects(agent, objectId, replyHandle, timeout);
       return (objects.size() == 0) ? null : objects.get(0);
   }


   /**
* Perform a query for QmfConsoleData objects. Returns a list (possibly empty) of matching objects. * If replyHandle is null this method will block until the agent replies, or the timeout expires. * Once the timeout expires, all data retrieved to date is returned. If replyHandle is non-null an
    * asynchronous request is performed
    *
    * @param agent the Agent being queried
    * @param query the ObjectId or SchemaClassId being queried for.
* @param replyHandle the correlation handle used to tie asynchronous method requests with responses * @param timeout the time to wait for a reply from the Agent, a value of -1 means use the default timeout
    * @return a List of QMF Objects describing that class
    */
private List<QmfConsoleData> getObjects(final Agent agent, final QmfData query, final String replyHandle, int timeout)
   {
       String agentName = agent.getName();
       timeout = (timeout < 1) ? _replyTimeout : timeout;
       List<QmfConsoleData> results = Collections.emptyList();
       try
       {
Destination destination = (replyHandle == null) ? _replyAddress : _asyncReplyAddress;
           MapMessage request = _syncSession.createMapMessage();
           request.setJMSReplyTo(destination);
           request.setJMSCorrelationID(replyHandle);
           request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
           request.setStringProperty("method", "request");
           request.setStringProperty("qmf.opcode", "_query_request");
           request.setStringProperty("qpid.subject", agentName);

// Create a QMF Query for an "OBJECT" target using either a schema ID or object ID String queryType = (query instanceof SchemaClassId) ? "_schema_id" : "_object_id";
           request.setObject("_what", "OBJECT");
           request.setObject(queryType, query.mapEncode());

// Wrap request & response in synchronized block in case any other threads invoke a request // it would be somewhat unfortunate if their response got interleaved with ours!!
           synchronized(this)
           {
               _requester.send(request);
               if (replyHandle == null)
               {
                   Message response = _responder.receive(timeout*1000);
                   if (response == null)
                   {
                       log.info("No response received in getObjects()");
                       return Collections.emptyList();
                   }

                   if (response instanceof MapMessage)
                   { // Error responses are returned as MapMessages
//QmfData exception = new QmfData(AMQPMessage.getMap(response)); //System.out.println(agentName + " " + exception.getStringValue("error_text"));
                   }
                   else
                   {
List<Map> mapResults = AMQPMessage.getList(response); results = new ArrayList<QmfConsoleData>(mapResults.size());
                       for (Map content : mapResults)
                       {
                           results.add(new QmfConsoleData(content, agent));
                       }
                   }
               }
           }
       }
       catch (JMSException jmse)
       {
log.info("JMSException {} caught in getObjects()", jmse.getMessage());
       }
       return results;
   }

   /**
* JMS QMF returns amqp/list types as a BytesMessage this method decodes that into a java.util.List
    * <p>
    * Taken from Gordon Sim's initial JMS QMF Example using the BBDecoder
    * <p>
* Trivia: This block of code from Gordon Sim is the seed that spawned the whole of this Java QMF2 API
    * implementation - cheers Gordon.
    *
    * @param message amqp/list encoded JMS Message
    * @return a java.util.List decoded from Message
    */
   @SuppressWarnings("unchecked")
public static <T> List<T> getList(final Message message) throws JMSException
   {
       if (message == null)
       {
throw new MessageFormatException("Attempting to do AMQPMessage.getList() on null Message");
       }
       else if (message instanceof BytesMessage)
       {
           BytesMessage msg = (BytesMessage)message;

           //only handles responses up to 2^31-1 bytes long
           byte[] data = new byte[(int) msg.getBodyLength()];
           msg.readBytes(data);
           BBDecoder decoder = new BBDecoder();
           decoder.init(ByteBuffer.wrap(data));
           return (List<T>)decoder.readList();
       }
       else
       {
           return null;
       }
   }





dmounessa wrote:
Thanks -- do I need to start the broker with any special module (qmf?) or
does the standard C++ broker will support the QMF messages?

Also, where can I get the list of messages that I could send to broker for
configurations/stats?

Thanks for your help?

--
View this message in context: 
http://apache-qpid-users.2158936.n2.nabble.com/Jave-Client-with-C-Broker-Messages-in-Queue-size-tp7055301p7067715.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]




---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to