Hi Martin
If you are using the python QMF2 API implementation which is the one that lives in qpid/extras/qmf/src/py/qmf2-prototype and implements the QMF2 API specified here https://cwiki.apache.org/qpid/qmfv2-api-proposal.html then what you would normally do would be to use getObjects to return a queue object which would be a QmfConsoleData then call invoke_method on that "invoke_method(name, inArgs{}, [[reply-handle] | [timeout]]): invoke the named method on this instance." the invoke_method call should return a MethodResponse object (see the InvokingMethods subsection in the page I linked) so *if* the method in question has a sensible return value (or needs to deliver an error/exception) it should be contained in that.

class MethodResult:
      <constructor>( QmfData<exception>  |<map of properties>  )
      .succeeded(): returns True if the method call executed without error.
      .get_exception(): returns the QmfData error object if method fails, else 
None
      .get_arguments(): returns a map of "name"=<value>  pairs of all returned 
arguments.
      .get_argument(<name>): returns value of argument named "name".

so get_arguments should give you what you need if it's actually present (and I don't actually know what purge returns).

I actually wrote a little QMF2 application using the purge method which is a "fuse" that intercepts queueThresholdExceeded Events and purges the queue in question of ~10% of the messages. It's actually written in Java (it's one of the demos I included in my Java QMF2 API implementation). I've just looked at it and unfortunately I didn't bother to do anything with the return value :-) but hopefully it'll give you some ideas (though do bear in mind that the program is intended to be just a demo so the exception handling stuff is a bit noddy).


    /**
* Look up a queue object with the given name and if it's not a ring queue invoke the queue's purge method.
     * @param queueName the name of the queue to purge
* @param msgDepth the number of messages on the queue, used to determine how many messages to purge.
     */
    private void purgeQueue(final String queueName, long msgDepth)
    {
        QmfConsoleData queue = _queueCache.get(queueName);

        if (queue == null)
        {
System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s reference couldn't be found\n",
                              new Date().toString(), queueName);
        }
        else
{ // If we've found a queue called queueName we then find the bindings that reference it.

            Map args = (Map)queue.getValue("arguments");
            String policyType = (String)args.get("qpid.policy_type");
            if (policyType != null && policyType.equals("ring"))
            {  // If qpid.policy_type=ring we return.
                return;
            }

            try
            {
                QmfData arguments = new QmfData();
                arguments.setValue("request", (long)(_purge*msgDepth));
                queue.invokeMethod("purge", arguments);
            }
            catch (QmfException e)
            {
                System.out.println(e.getMessage());
            }
        }
    }


HTH
Frase

On 21/03/12 11:50, MartiN Beneš wrote:
Interesting.
If I understand correctly, you are manually sending qmf2 commands to the
broker.
I would very much prefer to use existing qmf library instead of writing a
new one. Specifically to use the library bundled with the qpidd (mrg)
broker.

On Wed, Mar 21, 2012 at 12:35, Pavel Moravec<[email protected]>  wrote:

.. and now also for python client (that I overlooked in email subject):


conn = Connection(broker)
try:
  conn.open()
  ssn = conn.session()
  snd = ssn.sender("qmf.default.direct/broker")
  reply_to = "reply-queue; {create:always,
node:{x-declare:{auto-delete:true}}}"
  rcv = ssn.receiver(reply_to)

  content = {
             "_object_id": {"_object_name":
"org.apache.qpid.broker:broker:amqp-broker"},
             "_method_name": "delete",
             "_arguments": {"type":"queue", "name":queue_name}
            }
  request = Message(reply_to=reply_to, content=content)
  request.properties["x-amqp-0-10.app-id"] = "qmf2"
  request.properties["qmf.opcode"] = "_method_request"
  snd.send(request)

  try:
    response = rcv.fetch(timeout=30)
    if response.properties['x-amqp-0-10.app-id'] == 'qmf2':
      if response.properties['qmf.opcode'] == '_method_response':
        print "Response:"
        print response.content['_arguments']
      elif response.properties['qmf.opcode'] == '_exception':
        raise Exception("Error: %s" % response.content['_values'])
      else: raise Exception("Invalid response received, unexpected opcode:
%s" % m)
    else: raise Exception("Invalid response received, not a qmfv2 method:
%s" % m)
  except Empty:
    print "No response received!"
  except Exception, e:
    print e
except ReceiverError, e:
  print e
except KeyboardInterrupt:
  pass
conn.close()



----- Original Message -----
From: "Pavel Moravec"<[email protected]>
To: [email protected]
Sent: Wednesday, March 21, 2012 12:33:28 PM
Subject: Re: python QMF synchronization

Hi Martin,
you can set up ReplyTo address where qpid shall send its response.
I.e. something like C++ code below (that invokes queue deletion and
fetches response in 30seconds limit):

     Connection connection(url);
     try {
         connection.open();
         Session session = connection.createSession();
         Sender sender =
         session.createSender("qmf.default.direct/broker");
         Address responseQueue("#reply-queue; {create:always,
         node:{x-declare:{auto-delete:true}}}");
         Receiver receiver = session.createReceiver(responseQueue);

         Message message;
         Variant::Map content;
       Variant::Map OID;
       Variant::Map arguments;
       OID["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
       arguments["type"] = "queue";
       arguments["name"] = queue_name;

         content["_object_id"] = OID;
         content["_method_name"] = "delete";
         content["_arguments"] = arguments;

         encode(content, message);
       message.setReplyTo(responseQueue);
       message.setProperty("x-amqp-0-10.app-id", "qmf2");
       message.setProperty("qmf.opcode", "_method_request");

         sender.send(message, true);

       Message response;
       if (receiver.fetch(response,qpid::messaging::Duration(30000)) ==
       true)
       {
               qpid::types::Variant::Map recv_props =
response.getProperties();
               if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
                       if (recv_props["qmf.opcode"] == "_method_response")
                               std::cout<<  "Response: OK"<<  std::endl;
                       else if (recv_props["qmf.opcode"] == "_exception")
                               std::cerr<<  "Error: "<<
response.getContent()<<  std::endl;
                       else
                               std::cerr<<  "Invalid response received!"
<<  std::endl;
               else
                       std::cerr<<  "Invalid response not of qmf2 type
received!"<<
                       std::endl;
       }
       else
               std::cout<<  "Timeout: No response received within 30
seconds!"<<
               std::endl;

         connection.close();
         return 0;
     } catch(const std::exception&  error) {
         std::cout<<  error.what()<<  std::endl;
         connection.close();
     }

Kind regards,
Pavel



----- Original Message -----
From: "MartiN Beneš"<[email protected]>
To: "users"<[email protected]>
Sent: Wednesday, March 21, 2012 12:26:59 PM
Subject: python QMF synchronization

Hi,
I cannot find information about qmf synchronization. Or is every
call
synchronous?
For instance if I purge a queue: queue.purge(0)
how can i tell it was done?

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]



/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */
package org.apache.qpid.qmf2.tools;

// JMS Imports
import javax.jms.Connection;

// Misc Imports
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

// QMF2 Imports
import org.apache.qpid.qmf2.common.ObjectId;
import org.apache.qpid.qmf2.common.QmfData;
import org.apache.qpid.qmf2.common.QmfEvent;
import org.apache.qpid.qmf2.common.QmfEventListener;
import org.apache.qpid.qmf2.common.QmfException;
import org.apache.qpid.qmf2.common.SchemaClassId;
import org.apache.qpid.qmf2.common.WorkItem;
import org.apache.qpid.qmf2.console.Agent;
import org.apache.qpid.qmf2.console.Console;
import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
import org.apache.qpid.qmf2.console.QmfConsoleData;

import org.apache.qpid.qmf2.util.ConnectionHelper;
import org.apache.qpid.qmf2.util.GetOpt;

/**
 * QueueFuse provides protection to message producers from consumers who can't consume messages fast enough.
 * <p>
 * With the default "reject" limit policy when a queue exceeds its capacity an exception is thrown to the
 * producer. This behaviour is unfortunate, because if there happen to be multiple consumers consuming
 * messages from a given producer it is possible for a single slow consumer to cause message flow to be
 * stopped to <u>all</u> consumers, in other words a de-facto denial of service may take place.
 * <p>
 * In an Enterprise environment it is likely that this sort of behaviour is unwelcome, so QueueFuse makes it
 * possible for queueThresholdExceeded Events to be detected and for the offending queues to have messages
 * purged, thus protecting the other consumers by preventing an exception being thrown to the message producer.
 * <p>
 * The original intention with this class was to unbind bindings to queues that exceed the threshold. This method
 * works, but it has a number of disadvantages. In particular there is no way to unbind from (and thus protect)
 * queues bound to the default direct exchange, in addition in order to unbind it is necessary to retrieve
 * binding and exchange information, both of which require further exchanges with the broker (which is not
 * desirable as when the queueThresholdExceeded occurs we need to act pretty quickly). Finally as it happens
 * it is also necessary to purge some messages after unbinding anyway as if this is not done the queue remains
 * in the flowStopped state and producers will eventually time out and throw an exception if this is not cleared.
 * So all in all simply purging each time we cross the threshold is simpler and has the additional advantage that
 * if and when the consumer speeds up message delivery will eventually return to normal. 
 * 
 * <pre>
 * Usage: QueueFuse [options] [broker-addr]...
 * 
 * Monitors one or more Qpid message brokers for queueThresholdExceeded Events.
 *
 * If a queueThresholdExceeded Event occurs messages are purged from the queue,
 * in other words this class behaves rather like a fuse 'blowing' if the
 * threshold gets exceeded.
 *
 * If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.
 *
 * [broker-addr] syntax:
 *
 * [username/password@] hostname
 * ip-address [:&lt;port&gt;]
 * 
 * Examples:
 * 
 * $ QueueFuse localhost:5672
 * $ QueueFuse 10.1.1.7:10000
 * $ QueueFuse guest/guest@broker-host:10000
 *
 * Options:
 *   -h, --help            show this help message and exit
 *   -f &lt;filter&gt;, --filter=&lt;filter&gt;
 *                         a list of comma separated queue names (regex are
 *                         accepted) to protect (default is to protect all).
 *   -p &lt;PERCENT&gt;, --purge=&lt;PERCENT&gt;\n" +
 *                         The percentage of messages to purge when the queue\n" +
 *                         threshold gets exceeded (default = 20%).\n" +
 *                         N.B. if this gets set too low the fuse may not blow.\n" +
 *   --sasl-mechanism=&lt;mech&gt;
 *                         SASL mechanism for authentication (e.g. EXTERNAL,
 *                         ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL
 *                         automatically picks the most secure available
 *                         mechanism - use this option to override.
 * </pre>
 * @author Fraser Adams
 */
public final class QueueFuse implements QmfEventListener
{
    private static final String _usage =
    "Usage: QueueFuse [options] [broker-addr]...\n";

    private static final String _description =
    "Monitors one or more Qpid message brokers for queueThresholdExceeded Events.\n" +
    "\n" +
    "If a queueThresholdExceeded Event occurs messages are purged from the queue,\n" +
    "in other words this class behaves rather like a fuse 'blowing' if the\n" +
    "threshold gets exceeded.\n" +
    "\n" +
    "If no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.\n" +
    "\n" +
    "[broker-addr] syntax:\n" +
    "\n" +
    "[username/password@] hostname\n" +
    "ip-address [:<port>]\n" +
    "\n" +
    "Examples:\n" +
    "\n" +
    "$ QueueFuse localhost:5672\n" +
    "$ QueueFuse 10.1.1.7:10000\n" +
    "$ QueueFuse guest/guest@broker-host:10000\n";

    private static final String _options =
    "Options:\n" +
    "  -h, --help            show this help message and exit\n" +
    "  -f <filter>, --filter=<filter>\n" +
    "                        a list of comma separated queue names (regex are\n" +
    "                        accepted) to protect (default is to protect all).\n" +
    "  -p <PERCENT>, --purge=<PERCENT>\n" +
    "                        The percentage of messages to purge when the queue\n" +
    "                        threshold gets exceeded (default = 20%).\n" +
    "                        N.B. if this gets set too low the fuse may not blow.\n" +
    "  --sasl-mechanism=<mech>\n" +
    "                        SASL mechanism for authentication (e.g. EXTERNAL,\n" +
    "                        ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL\n" +
    "                        automatically picks the most secure available\n" +
    "                        mechanism - use this option to override.\n";

    private final String _url;
    private final List<Pattern> _filter;
    private final float _purge;
    private Map<String, QmfConsoleData> _queueCache = new HashMap<String, QmfConsoleData>(50);
    private Console _console;

    /**
     * Basic constructor. Creates JMS Session, Initialises Destinations, Producers & Consumers and starts connection.
     * @param url the connection URL.
     * @param connectionOptions the options String to pass to ConnectionHelper.
     * @param filter a list of regex Patterns used to choose the queues we wish to protect.
     * @param purge the ratio of messages that we wish to purge if the threshold gets exceeded.
     */
    public QueueFuse(final String url, final String connectionOptions, final List<Pattern> filter, final float purge)
    {
        System.out.println("QueueFuse Connecting to " + url);
        if (filter.size() > 0)
        {
            System.out.println("Filter = " + filter);
        }
        _url = url;
        _filter = filter;
        _purge = purge;
        try
        {
            Connection connection = ConnectionHelper.createConnection(url, connectionOptions);        
            _console = new Console(this);
            _console.addConnection(connection);
            updateQueueCache();
        }
        catch (QmfException qmfe)
        {
            System.err.println("QmfException " + qmfe.getMessage() + " caught in QueueFuse constructor");
        }
    }

    /**
     * Looks up queue objects and stores them in _queueCache keyed by the queue name
     */
    private void updateQueueCache()
    {
        _queueCache.clear();
        List<QmfConsoleData> queues = _console.getObjects("org.apache.qpid.broker", "queue");
        for (QmfConsoleData queue : queues)
        {
            String queueName = queue.getStringValue("name");
            _queueCache.put(queueName, queue);
        }
    }

    /**
     * Look up a queue object with the given name and if it's not a ring queue invoke the queue's purge method.
     * @param queueName the name of the queue to purge
     * @param msgDepth the number of messages on the queue, used to determine how many messages to purge.
     */
    private void purgeQueue(final String queueName, long msgDepth)
    {
        QmfConsoleData queue = _queueCache.get(queueName);

        if (queue == null)
        {
            System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s reference couldn't be found\n",
                              new Date().toString(), queueName);
        }
        else
        { // If we've found a queue called queueName we then find the bindings that reference it.

            Map args = (Map)queue.getValue("arguments");
            String policyType = (String)args.get("qpid.policy_type");
            if (policyType != null && policyType.equals("ring"))
            {  // If qpid.policy_type=ring we return.
                return;
            }

            try
            {
                QmfData arguments = new QmfData();
                arguments.setValue("request", (long)(_purge*msgDepth));
                queue.invokeMethod("purge", arguments);
            }
            catch (QmfException e)
            {
                System.out.println(e.getMessage());
            }    
        }
    }

    /**
     * Main Event handler.
     * @param wi a QMF2 WorkItem object
     */
    public void onEvent(final WorkItem wi)
    {
        if (wi instanceof EventReceivedWorkItem)
        {
            EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
            Agent agent = item.getAgent();
            QmfEvent event = item.getEvent();
            String className = event.getSchemaClassId().getClassName();

            if (className.equals("queueDeclare"))
            {
                updateQueueCache();
            }
            else if (className.equals("queueThresholdExceeded"))
            {
                String queueName = event.getStringValue("qName");
                boolean matches = false;
                for (Pattern x : _filter)
                { // Check the queue name against the regexes in the filter List (if any)
                    Matcher m = x.matcher(queueName);
                    if (m.find())
                    {
                        matches = true;
                        break;
                    }
                }

                if (_filter.isEmpty() || matches)
                { // If there's no filter enabled or the filter matches the queue name we call purgeQueue().
                    long msgDepth = event.getLongValue("msgDepth");
                    purgeQueue(queueName, msgDepth);
                }
            }
        }
    }

    /**
     * Runs QueueFuse.
     * @param args the command line arguments.
     */
    public static void main(final String[] args)
    {
        System.setProperty("amqj.logging.level", "FATAL");

        String[] longOpts = {"help", "filter=", "purge=", "sasl-mechanism="};
        try
        {
            boolean includeRingQueues = false;
            String connectionOptions = "{reconnect: true}";
            List<Pattern> filter = new ArrayList<Pattern>();
            float purge = 0.2f;
            GetOpt getopt = new GetOpt(args, "hf:p:", longOpts);
            List<String[]> optList = getopt.getOptList();
            String[] cargs = {};
            cargs = getopt.getEncArgs().toArray(cargs);
            for (String[] opt : optList)
            {
                if (opt[0].equals("-h") || opt[0].equals("--help"))
                {
                    System.out.println(_usage);
                    System.out.println(_description);
                    System.out.println(_options);
                    System.exit(1);
                }
                else if (opt[0].equals("-f") || opt[0].equals("--filter"))
                {
                    String[] split = opt[1].split(",");
                    for (String s : split)
                    {
                        Pattern p = Pattern.compile(s);
                        filter.add(p);
                    }
                }
                else if (opt[0].equals("-p") || opt[0].equals("--purge"))
                {
                    int percent = Integer.parseInt(opt[1]);
                    if (percent < 0 || percent > 100)
                    {
                        System.out.println(_usage);
                        System.exit(1);
                    }
                    purge = percent/100.0f;
                }
                else if (opt[0].equals("--sasl-mechanism"))
                {
                    connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
                }
            }

            int nargs = cargs.length;
            if (nargs == 0)
            {
                cargs = new String[] {"localhost"};
            }

            for (String url : cargs)
            {
                QueueFuse queueFuse = new QueueFuse(url, connectionOptions, filter, purge);
            }
        }
        catch (IllegalArgumentException e)
        {
            System.out.println(_usage);
            System.out.println(e.getMessage());
            System.exit(1);
        }

        try
        {   // Block here
            Thread.currentThread().join();
        }
        catch (InterruptedException ie)
        {
        }
    }
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to