Hi Sergey,
Gordon Sim was I think going to put a small patch to qpid-config for qpid 0.14 I don't know if he did though as I've not had time to play with 0.14

To tide you over I've attached a patched version of qpid-config based on the 0.8 version of qpid-config


./qpid-config-patched -b queues

look in QueueListRecuse and ExchangeListRecurse to see the differences if you want to tweak a later version of qpid-config

It's only about four lines in total


On 10/02/12 12:14, Zhemzhitsky Sergey wrote:
Hi gurus,

Is there any way to view all the binding properties of the headers exchange?
I need to retrieve all the headers and their values among with ‘x-match’ 
(all/any) property by means of api or somehow else.

Best Regards,


The information contained in this message may be privileged and conf idential 
and protected from disclosure. If you are not the original intended recipient, 
you are hereby notified that any review, retransmission, dissemination, or 
other use of, or taking of any action in reliance upon, this information is 
prohibited. If you have received this communication in error, please notify the 
sender immediately by replying to this message and delete it from your 
computer. Thank you for your cooperation. Troika Dialog, Russia.
If you need assistance please contact our Contact Center  (+7495) 258 0500 or 
go to www.troika.ru/eng/Contacts/system.wbp

#!/usr/bin/env python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#   http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import os
import getopt
import sys
import locale
from qmf.console import Session

_recursive         = False
_host              = "localhost"
_connTimeout       = 10
_altern_ex         = None
_passive           = False
_durable           = False
_clusterDurable    = False
_if_empty          = True
_if_unused         = True
_fileCount         = 8
_fileSize          = 24
_maxQueueSize      = None
_maxQueueCount     = None
_limitPolicy       = None
_order             = None
_msgSequence       = False
_ive               = False
_eventGeneration   = None
_file              = None

FILECOUNT = "qpid.file_count"
FILESIZE  = "qpid.file_size"
MAX_QUEUE_SIZE  = "qpid.max_size"
MAX_QUEUE_COUNT  = "qpid.max_count"
POLICY_TYPE  = "qpid.policy_type"
CLUSTER_DURABLE = "qpid.persist_last_node"
LVQ = "qpid.last_value_queue"
LVQNB = "qpid.last_value_queue_no_browse"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"

def Usage (short=False):
    print "Usage:  qpid-config [OPTIONS]"
    print "        qpid-config [OPTIONS] exchanges [filter-string]"
    print "        qpid-config [OPTIONS] queues    [filter-string]"
    print "        qpid-config [OPTIONS] add exchange <type> <name> 
    print "        qpid-config [OPTIONS] del exchange <name>"
    print "        qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
    print "        qpid-config [OPTIONS] del queue <name> [DelQueueOptions]"
    print "        qpid-config [OPTIONS] bind   <exchange-name> <queue-name> 
    print "                  <for type xml>     [-f -|filename]"
    print "                  <for type header>  [all|any] k1=v1 [, k2=v2...]"
    print "        qpid-config [OPTIONS] unbind <exchange-name> <queue-name> 
    if short:

    print "Options:"
    print "    --timeout seconds (10)                    Maximum time to wait 
for broker connection"
    print "    -b [ --bindings ]                         Show bindings in queue 
or exchange list"
    print "    -a [ --broker-addr ] Address (localhost)  Address of qpidd 
    print "         broker-addr is in the form:   [username/password@] hostname 
| ip-address [:<port>]"
    print "         ex:  localhost,, broker-host:10000, 
    print "Add Queue Options:"
    print "    --alternate-exchange [name of the alternate exchange]"
    print "                         The alternate-exchange field specifies how 
messages on this queue should"
    print "                         be treated when they are rejected by a 
subscriber, or when they are"
    print "                         orphaned by queue deletion. When present, 
rejected or orphaned messages"
    print "                         MUST be routed to the alternate-exchange. 
In all cases the messages MUST"
    print "                         be removed from the queue."
    print "    --passive            Do not actually change the broker state 
(queue will not be created)"
    print "    --durable            Queue is durable"
    print "    --cluster-durable    Queue becomes durable if there is only one 
functioning cluster node"
    print "    --file-count N (8)   Number of files in queue's persistence 
    print "    --file-size  N (24)  File size in pages (64Kib/page)"
    print "    --max-queue-size N   Maximum in-memory queue size as bytes"
    print "    --max-queue-count N  Maximum in-memory queue size as a number of 
    print "    --limit-policy [none | reject | flow-to-disk | ring | 
    print "                         Action taken when queue limit is reached:"
    print "                             none (default) - Use broker's default 
    print "                             reject         - Reject enqueued 
    print "                             flow-to-disk   - Page messages to disk"
    print "                             ring           - Replace oldest 
unacquired message with new"
    print "                             ring-strict    - Replace oldest 
message, reject if oldest is acquired"
    print "    --order [fifo | lvq | lvq-no-browse]"
    print "                         Set queue ordering policy:"
    print "                             fifo (default) - First in, first out"
    print "                             lvq            - Last Value Queue 
ordering, allows queue browsing"
    print "                             lvq-no-browse  - Last Value Queue 
ordering, browsing clients may lose data"
    print "    --generate-queue-events N"
    print "                         If set to 1, every enqueue will generate an 
event that can be processed by"
    print "                         registered listeners (e.g. for 
replication). If set to 2, events will be"
    print "                         generated for enqueues and dequeues"
    print "Del Queue Options:"
    print "    --force               Force delete of queue even if it's 
currently used or it's not empty"
    print "    --force-if-not-empty  Force delete of queue even if it's not 
    print "    --force-if-used       Force delete of queue even if it's 
currently used"
    print "Add Exchange <type> values:"
    print "    direct     Direct exchange for point-to-point communication"
    print "    fanout     Fanout exchange for broadcast communication"
    print "    topic      Topic exchange that routes messages using binding 
keys with wildcards"
    print "    headers    Headers exchange that matches header fields against 
the binding keys"
    print "Add Exchange Options:"
    print "    --alternate-exchange [name of the alternate exchange]"
    print "                         In the event that a message cannot be 
routed, this is the name of the exchange to"
    print "                         which the message will be sent. Messages 
transferred using message.transfer will"
    print "                         be routed to the alternate-exchange only if 
they are sent with the \"none\""
    print "                         accept-mode, and the discard-unroutable 
delivery property is set to false, and"
    print "                         there is no queue to route to for the given 
message according to the bindings"
    print "                         on this exchange."
    print "    --passive            Do not actually change the broker state 
(exchange will not be created)"
    print "    --durable            Exchange is durable"
    print "    --sequence           Exchange will insert a 'qpid.msg_sequence' 
field in the message header"
    print "                         with a value that increments for each 
message forwarded."
    print "    --ive                Exchange will behave as an 
'initial-value-exchange', keeping a reference"
    print "                         to the last message forwarded and enqueuing 
that message to newly bound"
    print "                         queues."
    sys.exit (1)

# helpers for the arg parsing in bind().  return multiple values; "ok"
# followed by the resultant args

# accept -f followed by either
# a filename or "-", for stdin.  pull the bits into a string, to be 
# passed to the xml binding.
def snarf_xquery_args():
    if not _file:
        print "Invalid args to bind xml:  need an input file or stdin"
        return [False]
    if _file == "-":
        res = sys.stdin.read()
        f = open(_file)   # let this signal if it can't find it
        res = f.read()
    return [True, res]
# look for "any"/"all" and grok the rest of argv into a map
def snarf_header_args(cargs):
    if len(cargs) < 2:
        print "Invalid args to bind headers:  need 'any'/'all' plus conditions"
        return [False]
    op = cargs[0]
    if op == "all" or op == "any":
        kv = {}
        for thing in cargs[1:]:
            k_and_v = thing.split("=")
            kv[k_and_v[0]] = k_and_v[1]
        return [True, op, kv]
        print "Invalid condition arg to bind headers, need 'any' or 'all', not 
'" + op + "'"
        return [False]

class BrokerManager:
    def __init__ (self):
        self.brokerName = None
        self.qmf        = None
        self.broker     = None

    def SetBroker (self, brokerUrl):
        self.url = brokerUrl
        self.qmf = Session()
        self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
        agents = self.qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == '0':
                self.brokerAgent = a

    def Disconnect(self):
        if self.broker:

    def Overview (self):
        exchanges = self.qmf.getObjects(_class="exchange", 
        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
        print "Total Exchanges: %d" % len (exchanges)
        etype = {}
        for ex in exchanges:
            if ex.type not in etype:
                etype[ex.type] = 1
                etype[ex.type] = etype[ex.type] + 1
        for typ in etype:
            print "%15s: %d" % (typ, etype[typ])

        print "   Total Queues: %d" % len (queues)
        _durable = 0
        for queue in queues:
            if queue.durable:
                _durable = _durable + 1
        print "        durable: %d" % _durable
        print "    non-durable: %d" % (len (queues) - _durable)

    def ExchangeList (self, filter):
        exchanges = self.qmf.getObjects(_class="exchange", 
        caption1 = "Type      "
        caption2 = "Exchange Name"
        maxNameLen = len(caption2)
        for ex in exchanges:
            if self.match(ex.name, filter):
                if len(ex.name) > maxNameLen:  maxNameLen = len(ex.name)
        print "%s%-*s  Attributes" % (caption1, maxNameLen, caption2)
        line = ""
        for i in range(((maxNameLen + len(caption1)) / 5) + 5):
            line += "====="
        print line

        for ex in exchanges:
            if self.match (ex.name, filter):
                print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
                args = ex.arguments
                if ex.durable:    print "--durable",
                if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print 
                if IVE in args and args[IVE] == 1: print "--ive",
                if ex.altExchange:
                    print "--alternate-exchange=%s" % ex._altExchange_.name,

    def ExchangeListRecurse (self, filter):
        exchanges = self.qmf.getObjects(_class="exchange", 
        bindings  = self.qmf.getObjects(_class="binding", 
        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
        for ex in exchanges:
            if self.match (ex.name, filter):
                print "Exchange '%s' (%s)" % (ex.name, ex.type)
                for bind in bindings:
                    if bind.exchangeRef == ex.getObjectId():
                        qname = "<unknown>"
                        queue = self.findById (queues, bind.queueRef)
                        if queue != None:
                            qname = queue.name
                        if bind.arguments:
                            print "    bind [%s] => %s %s" % (bind.bindingKey, 
qname, bind.arguments)
                            print "    bind [%s] => %s" % (bind.bindingKey, 

    def QueueList (self, filter):
        queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)

        caption = "Queue Name"
        maxNameLen = len(caption)
        for q in queues:
            if self.match (q.name, filter):
                if len(q.name) > maxNameLen:  maxNameLen = len(q.name)
        print "%-*s  Attributes" % (maxNameLen, caption)
        line = ""
        for i in range((maxNameLen / 5) + 5):
            line += "====="
        print line

        for q in queues:
            if self.match (q.name, filter):
                print "%-*s " % (maxNameLen, q.name),
                args = q.arguments
                if q.durable:    print "--durable",
                if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: 
print "--cluster-durable",
                if q.autoDelete: print "auto-del",
                if q.exclusive:  print "excl",
                if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
                if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
                if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % 
                if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % 
                if POLICY_TYPE in args: print "--limit-policy=%s" % 
args[POLICY_TYPE].replace("_", "-"),
                if LVQ in args and args[LVQ] == 1: print "--order lvq",
                if LVQNB in args and args[LVQNB] == 1: print "--order 
                if QUEUE_EVENT_GENERATION in args: print 
"--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
                if q.altExchange:
                    print "--alternate-exchange=%s" % q._altExchange_.name,

    def QueueListRecurse (self, filter):
        exchanges = self.qmf.getObjects(_class="exchange", 
        bindings  = self.qmf.getObjects(_class="binding", 
        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
        for queue in queues:
            if self.match (queue.name, filter):
                print "Queue '%s'" % queue.name
                for bind in bindings:
                    if bind.queueRef == queue.getObjectId():
                        ename = "<unknown>"
                        ex    = self.findById (exchanges, bind.exchangeRef)
                        if ex != None:
                            ename = ex.name
                            if ename == "":
                                ename = "''"
                        if bind.arguments:
                            print "    bind [%s] => %s %s" % (bind.bindingKey, 
ename, bind.arguments)
                            print "    bind [%s] => %s" % (bind.bindingKey, 

    def AddExchange (self, args):
        if len (args) < 2:
            Usage ()
        etype = args[0]
        ename = args[1]
        declArgs = {}
        if _msgSequence:
            declArgs[MSG_SEQUENCE] = 1
        if _ive:
            declArgs[IVE] = 1
        if _altern_ex != None:
            self.broker.getAmqpSession().exchange_declare (exchange=ename, 
type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable, 
            self.broker.getAmqpSession().exchange_declare (exchange=ename, 
type=etype, passive=_passive, durable=_durable, arguments=declArgs)

    def DelExchange (self, args):
        if len (args) < 1:
            Usage ()
        ename = args[0]
        self.broker.getAmqpSession().exchange_delete (exchange=ename)

    def AddQueue (self, args):
        if len (args) < 1:
            Usage ()
        qname    = args[0]
        declArgs = {}
        if _durable:
            declArgs[FILECOUNT] = _fileCount
            declArgs[FILESIZE]  = _fileSize

        if _maxQueueSize:
            declArgs[MAX_QUEUE_SIZE]  = _maxQueueSize
        if _maxQueueCount:
            declArgs[MAX_QUEUE_COUNT]  = _maxQueueCount
        if _limitPolicy:
            if _limitPolicy == "none":
            elif _limitPolicy == "reject":
                declArgs[POLICY_TYPE] = "reject"
            elif _limitPolicy == "flow-to-disk":
                declArgs[POLICY_TYPE] = "flow_to_disk"
            elif _limitPolicy == "ring":
                declArgs[POLICY_TYPE] = "ring"
            elif _limitPolicy == "ring-strict":
                declArgs[POLICY_TYPE] = "ring_strict"

        if _clusterDurable:
            declArgs[CLUSTER_DURABLE] = 1
        if _order:
            if _order == "fifo":
            elif _order == "lvq":
                declArgs[LVQ] = 1
            elif _order == "lvq-no-browse":
                declArgs[LVQNB] = 1
        if _eventGeneration:
            declArgs[QUEUE_EVENT_GENERATION]  = _eventGeneration

        if _altern_ex != None:
            self.broker.getAmqpSession().queue_declare (queue=qname, 
alternate_exchange=_altern_ex, passive=_passive, durable=_durable, 
            self.broker.getAmqpSession().queue_declare (queue=qname, 
passive=_passive, durable=_durable, arguments=declArgs)

    def DelQueue (self, args):
        if len (args) < 1:
            Usage ()
        qname = args[0]
        self.broker.getAmqpSession().queue_delete (queue=qname, 
if_empty=_if_empty, if_unused=_if_unused)

    def Bind (self, args):
        if len (args) < 2:
            Usage ()
        ename = args[0]
        qname = args[1]
        key   = ""
        if len (args) > 2:
            key = args[2]

        # query the exchange to determine its type.
        res = self.broker.getAmqpSession().exchange_query(ename)

        # type of the xchg determines the processing of the rest of
        # argv.  if it's an xml xchg, we want to find a file
        # containing an x-query, and pass that.  if it's a headers
        # exchange, we need to pass either "any" or all, followed by a
        # map containing key/value pairs.  if neither of those, extra
        # args are ignored.
        ok = True
        args = None
        if res.type == "xml":
            # this checks/imports the -f arg
            [ok, xquery] = snarf_xquery_args()
            args = { "xquery" : xquery }
            # print args
            if res.type == "headers":
                [ok, op, kv] = snarf_header_args(cargs[4:])
                args = kv
                args["x-match"] = op

        if not ok:

        self.broker.getAmqpSession().exchange_bind (queue=qname,

    def Unbind (self, args):
        if len (args) < 2:
            Usage ()
        ename = args[0]
        qname = args[1]
        key   = ""
        if len (args) > 2:
            key = args[2]
        self.broker.getAmqpSession().exchange_unbind (queue=qname, 
exchange=ename, binding_key=key)

    def findById (self, items, id):
        for item in items:
            if item.getObjectId() == id:
                return item
        return None

    def match (self, name, filter):
        if filter == "":
            return True
        if name.find (filter) == -1:
            return False
        return True

def YN (bool):
    if bool:
        return 'Y'
    return 'N'

## Main Program

    longOpts = ("help", "durable", "cluster-durable", "bindings", 
"broker-addr=", "file-count=",
                "file-size=", "max-queue-size=", "max-queue-count=", 
                "order=", "sequence", "ive", "generate-queue-events=", "force", 
                "force_if_used", "alternate-exchange=", "passive", "timeout=", 
    (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "ha:bf:", longOpts)
except Exception, e:
    Usage (short=True)
    # make output match optparse-based tools' output, for consistent scripting
    msg = str(e).replace('option', 'no such option:').replace('not recognized', 
    print "qpid-config: error:", msg
    sys.exit (1)    

    encoding = locale.getpreferredencoding()
    cargs = [a.decode(encoding) for a in encArgs]
    cargs = encArgs

for opt in optlist:
    if opt[0] == "-h" or opt[0] == "--help":
    if opt[0] == "-b" or opt[0] == "--bindings":
        _recursive = True
    if opt[0] == "-a" or opt[0] == "--broker-addr":
        _host = opt[1]
    if opt[0] == "-f" or opt[0] == "--file":
        _file = opt[1]
    if opt[0] == "--timeout":
        _connTimeout = int(opt[1])
        if _connTimeout == 0:
            _connTimeout = None
    if opt[0] == "--alternate-exchange":
        _altern_ex = opt[1]
    if opt[0] == "--passive":
        _passive = True
    if opt[0] == "--durable":
        _durable = True
    if opt[0] == "--cluster-durable":
        _clusterDurable = True
    if opt[0] == "--file-count":
        _fileCount = int (opt[1])
    if opt[0] == "--file-size":
        _fileSize = int (opt[1])
    if opt[0] == "--max-queue-size":
        _maxQueueSize = int (opt[1])
    if opt[0] == "--max-queue-count":
        _maxQueueCount = int (opt[1])
    if opt[0] == "--limit-policy":
        _limitPolicy = opt[1]
        if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring", 
            print "Error: Invalid --limit-policy argument"
    if opt[0] == "--order":
        _order = opt[1]
        if _order not in ("fifo", "lvq", "lvq-no-browse"):
            print "Error: Invalid --order argument"
    if opt[0] == "--sequence":
        _msgSequence = True
    if opt[0] == "--ive":
        _ive = True
    if opt[0] == "--generate-queue-events":
        _eventGeneration = int (opt[1])
    if opt[0] == "--force":
        _if_empty  = False
        _if_unused = False
    if opt[0] == "--force-if-not-empty":
        _if_empty  = False
    if opt[0] == "--force-if-used":
        _if_unused = False

nargs = len (cargs)
bm    = BrokerManager ()

    if nargs == 0:
        bm.Overview ()
        cmd = cargs[0]
        modifier = ""
        if nargs > 1:
            modifier = cargs[1]
        if cmd == "exchanges":
            if _recursive:
                bm.ExchangeListRecurse (modifier)
                bm.ExchangeList (modifier)
        elif cmd == "queues":
            if _recursive:
                bm.QueueListRecurse (modifier)
                bm.QueueList (modifier)
        elif cmd == "add":
            if modifier == "exchange":
                bm.AddExchange (cargs[2:])
            elif modifier == "queue":
                bm.AddQueue (cargs[2:])
                Usage ()
        elif cmd == "del":
            if modifier == "exchange":
                bm.DelExchange (cargs[2:])
            elif modifier == "queue":
                bm.DelQueue (cargs[2:])
                Usage ()
        elif cmd == "bind":
            bm.Bind (cargs[1:])
        elif cmd == "unbind":
            bm.Unbind (cargs[1:])
            Usage ()
except KeyboardInterrupt:
except IOError, e:
    print e
except SystemExit, e:
except Exception,e:
    if e.__class__.__name__ != "Timeout":
        # ignore Timeout exception, handle in the loop below
        print "Failed: %s: %s" % (e.__class__.__name__, e)
while True:
    # some commands take longer than the default amqp timeout to complete,
    # so attempt to disconnect until successful, ignoring Timeouts
    except Exception, e:
        if e.__class__.__name__ != "Timeout":
            print "Failed: %s: %s" % (e.__class__.__name__, e)

Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:users-subscr...@qpid.apache.org

Reply via email to