Hi Sergey,
Gordon Sim was I think going to put a small patch to qpid-config for
qpid 0.14 I don't know if he did though as I've not had time to play
with 0.14
To tide you over I've attached a patched version of qpid-config based on
the 0.8 version of qpid-config
do
./qpid-config-patched -b queues
look in QueueListRecuse and ExchangeListRecurse to see the differences
if you want to tweak a later version of qpid-config
It's only about four lines in total
HTH
Frase
On 10/02/12 12:14, Zhemzhitsky Sergey wrote:
Hi gurus,
Is there any way to view all the binding properties of the headers exchange?
I need to retrieve all the headers and their values among with ‘x-match’
(all/any) property by means of api or somehow else.
Best Regards,
Sergey
_______________________________________________________
The information contained in this message may be privileged and conf idential
and protected from disclosure. If you are not the original intended recipient,
you are hereby notified that any review, retransmission, dissemination, or
other use of, or taking of any action in reliance upon, this information is
prohibited. If you have received this communication in error, please notify the
sender immediately by replying to this message and delete it from your
computer. Thank you for your cooperation. Troika Dialog, Russia.
If you need assistance please contact our Contact Center (+7495) 258 0500 or
go to www.troika.ru/eng/Contacts/system.wbp
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import getopt
import sys
import locale
from qmf.console import Session
_recursive = False
_host = "localhost"
_connTimeout = 10
_altern_ex = None
_passive = False
_durable = False
_clusterDurable = False
_if_empty = True
_if_unused = True
_fileCount = 8
_fileSize = 24
_maxQueueSize = None
_maxQueueCount = None
_limitPolicy = None
_order = None
_msgSequence = False
_ive = False
_eventGeneration = None
_file = None
FILECOUNT = "qpid.file_count"
FILESIZE = "qpid.file_size"
MAX_QUEUE_SIZE = "qpid.max_size"
MAX_QUEUE_COUNT = "qpid.max_count"
POLICY_TYPE = "qpid.policy_type"
CLUSTER_DURABLE = "qpid.persist_last_node"
LVQ = "qpid.last_value_queue"
LVQNB = "qpid.last_value_queue_no_browse"
MSG_SEQUENCE = "qpid.msg_sequence"
IVE = "qpid.ive"
QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
def Usage (short=False):
print "Usage: qpid-config [OPTIONS]"
print " qpid-config [OPTIONS] exchanges [filter-string]"
print " qpid-config [OPTIONS] queues [filter-string]"
print " qpid-config [OPTIONS] add exchange <type> <name>
[AddExchangeOptions]"
print " qpid-config [OPTIONS] del exchange <name>"
print " qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
print " qpid-config [OPTIONS] del queue <name> [DelQueueOptions]"
print " qpid-config [OPTIONS] bind <exchange-name> <queue-name>
[binding-key]"
print " <for type xml> [-f -|filename]"
print " <for type header> [all|any] k1=v1 [, k2=v2...]"
print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name>
[binding-key]"
print
if short:
return
print "Options:"
print " --timeout seconds (10) Maximum time to wait
for broker connection"
print " -b [ --bindings ] Show bindings in queue
or exchange list"
print " -a [ --broker-addr ] Address (localhost) Address of qpidd
broker"
print " broker-addr is in the form: [username/password@] hostname
| ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000,
guest/guest@localhost"
print
print "Add Queue Options:"
print " --alternate-exchange [name of the alternate exchange]"
print " The alternate-exchange field specifies how
messages on this queue should"
print " be treated when they are rejected by a
subscriber, or when they are"
print " orphaned by queue deletion. When present,
rejected or orphaned messages"
print " MUST be routed to the alternate-exchange.
In all cases the messages MUST"
print " be removed from the queue."
print " --passive Do not actually change the broker state
(queue will not be created)"
print " --durable Queue is durable"
print " --cluster-durable Queue becomes durable if there is only one
functioning cluster node"
print " --file-count N (8) Number of files in queue's persistence
journal"
print " --file-size N (24) File size in pages (64Kib/page)"
print " --max-queue-size N Maximum in-memory queue size as bytes"
print " --max-queue-count N Maximum in-memory queue size as a number of
messages"
print " --limit-policy [none | reject | flow-to-disk | ring |
ring-strict]"
print " Action taken when queue limit is reached:"
print " none (default) - Use broker's default
policy"
print " reject - Reject enqueued
messages"
print " flow-to-disk - Page messages to disk"
print " ring - Replace oldest
unacquired message with new"
print " ring-strict - Replace oldest
message, reject if oldest is acquired"
print " --order [fifo | lvq | lvq-no-browse]"
print " Set queue ordering policy:"
print " fifo (default) - First in, first out"
print " lvq - Last Value Queue
ordering, allows queue browsing"
print " lvq-no-browse - Last Value Queue
ordering, browsing clients may lose data"
print " --generate-queue-events N"
print " If set to 1, every enqueue will generate an
event that can be processed by"
print " registered listeners (e.g. for
replication). If set to 2, events will be"
print " generated for enqueues and dequeues"
print
print "Del Queue Options:"
print " --force Force delete of queue even if it's
currently used or it's not empty"
print " --force-if-not-empty Force delete of queue even if it's not
empty"
print " --force-if-used Force delete of queue even if it's
currently used"
print
print "Add Exchange <type> values:"
print " direct Direct exchange for point-to-point communication"
print " fanout Fanout exchange for broadcast communication"
print " topic Topic exchange that routes messages using binding
keys with wildcards"
print " headers Headers exchange that matches header fields against
the binding keys"
print
print "Add Exchange Options:"
print " --alternate-exchange [name of the alternate exchange]"
print " In the event that a message cannot be
routed, this is the name of the exchange to"
print " which the message will be sent. Messages
transferred using message.transfer will"
print " be routed to the alternate-exchange only if
they are sent with the \"none\""
print " accept-mode, and the discard-unroutable
delivery property is set to false, and"
print " there is no queue to route to for the given
message according to the bindings"
print " on this exchange."
print " --passive Do not actually change the broker state
(exchange will not be created)"
print " --durable Exchange is durable"
print " --sequence Exchange will insert a 'qpid.msg_sequence'
field in the message header"
print " with a value that increments for each
message forwarded."
print " --ive Exchange will behave as an
'initial-value-exchange', keeping a reference"
print " to the last message forwarded and enqueuing
that message to newly bound"
print " queues."
print
sys.exit (1)
#
# helpers for the arg parsing in bind(). return multiple values; "ok"
# followed by the resultant args
#
# accept -f followed by either
# a filename or "-", for stdin. pull the bits into a string, to be
# passed to the xml binding.
#
def snarf_xquery_args():
if not _file:
print "Invalid args to bind xml: need an input file or stdin"
return [False]
if _file == "-":
res = sys.stdin.read()
else:
f = open(_file) # let this signal if it can't find it
res = f.read()
f.close()
return [True, res]
#
# look for "any"/"all" and grok the rest of argv into a map
#
def snarf_header_args(cargs):
if len(cargs) < 2:
print "Invalid args to bind headers: need 'any'/'all' plus conditions"
return [False]
op = cargs[0]
if op == "all" or op == "any":
kv = {}
for thing in cargs[1:]:
k_and_v = thing.split("=")
kv[k_and_v[0]] = k_and_v[1]
return [True, op, kv]
else:
print "Invalid condition arg to bind headers, need 'any' or 'all', not
'" + op + "'"
return [False]
class BrokerManager:
def __init__ (self):
self.brokerName = None
self.qmf = None
self.broker = None
def SetBroker (self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == '0':
self.brokerAgent = a
def Disconnect(self):
if self.broker:
self.qmf.delBroker(self.broker)
def Overview (self):
exchanges = self.qmf.getObjects(_class="exchange",
_agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
print "Total Exchanges: %d" % len (exchanges)
etype = {}
for ex in exchanges:
if ex.type not in etype:
etype[ex.type] = 1
else:
etype[ex.type] = etype[ex.type] + 1
for typ in etype:
print "%15s: %d" % (typ, etype[typ])
print
print " Total Queues: %d" % len (queues)
_durable = 0
for queue in queues:
if queue.durable:
_durable = _durable + 1
print " durable: %d" % _durable
print " non-durable: %d" % (len (queues) - _durable)
def ExchangeList (self, filter):
exchanges = self.qmf.getObjects(_class="exchange",
_agent=self.brokerAgent)
caption1 = "Type "
caption2 = "Exchange Name"
maxNameLen = len(caption2)
for ex in exchanges:
if self.match(ex.name, filter):
if len(ex.name) > maxNameLen: maxNameLen = len(ex.name)
print "%s%-*s Attributes" % (caption1, maxNameLen, caption2)
line = ""
for i in range(((maxNameLen + len(caption1)) / 5) + 5):
line += "====="
print line
for ex in exchanges:
if self.match (ex.name, filter):
print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
args = ex.arguments
if ex.durable: print "--durable",
if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print
"--sequence",
if IVE in args and args[IVE] == 1: print "--ive",
if ex.altExchange:
print "--alternate-exchange=%s" % ex._altExchange_.name,
print
def ExchangeListRecurse (self, filter):
exchanges = self.qmf.getObjects(_class="exchange",
_agent=self.brokerAgent)
bindings = self.qmf.getObjects(_class="binding",
_agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
for ex in exchanges:
if self.match (ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
for bind in bindings:
if bind.exchangeRef == ex.getObjectId():
qname = "<unknown>"
queue = self.findById (queues, bind.queueRef)
if queue != None:
qname = queue.name
if bind.arguments:
print " bind [%s] => %s %s" % (bind.bindingKey,
qname, bind.arguments)
else:
print " bind [%s] => %s" % (bind.bindingKey,
qname)
def QueueList (self, filter):
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
caption = "Queue Name"
maxNameLen = len(caption)
for q in queues:
if self.match (q.name, filter):
if len(q.name) > maxNameLen: maxNameLen = len(q.name)
print "%-*s Attributes" % (maxNameLen, caption)
line = ""
for i in range((maxNameLen / 5) + 5):
line += "====="
print line
for q in queues:
if self.match (q.name, filter):
print "%-*s " % (maxNameLen, q.name),
args = q.arguments
if q.durable: print "--durable",
if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1:
print "--cluster-durable",
if q.autoDelete: print "auto-del",
if q.exclusive: print "excl",
if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" %
args[MAX_QUEUE_SIZE],
if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" %
args[MAX_QUEUE_COUNT],
if POLICY_TYPE in args: print "--limit-policy=%s" %
args[POLICY_TYPE].replace("_", "-"),
if LVQ in args and args[LVQ] == 1: print "--order lvq",
if LVQNB in args and args[LVQNB] == 1: print "--order
lvq-no-browse",
if QUEUE_EVENT_GENERATION in args: print
"--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
if q.altExchange:
print "--alternate-exchange=%s" % q._altExchange_.name,
print
def QueueListRecurse (self, filter):
exchanges = self.qmf.getObjects(_class="exchange",
_agent=self.brokerAgent)
bindings = self.qmf.getObjects(_class="binding",
_agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
for queue in queues:
if self.match (queue.name, filter):
print "Queue '%s'" % queue.name
for bind in bindings:
if bind.queueRef == queue.getObjectId():
ename = "<unknown>"
ex = self.findById (exchanges, bind.exchangeRef)
if ex != None:
ename = ex.name
if ename == "":
ename = "''"
if bind.arguments:
print " bind [%s] => %s %s" % (bind.bindingKey,
ename, bind.arguments)
else:
print " bind [%s] => %s" % (bind.bindingKey,
ename)
def AddExchange (self, args):
if len (args) < 2:
Usage ()
etype = args[0]
ename = args[1]
declArgs = {}
if _msgSequence:
declArgs[MSG_SEQUENCE] = 1
if _ive:
declArgs[IVE] = 1
if _altern_ex != None:
self.broker.getAmqpSession().exchange_declare (exchange=ename,
type=etype, alternate_exchange=_altern_ex, passive=_passive, durable=_durable,
arguments=declArgs)
else:
self.broker.getAmqpSession().exchange_declare (exchange=ename,
type=etype, passive=_passive, durable=_durable, arguments=declArgs)
def DelExchange (self, args):
if len (args) < 1:
Usage ()
ename = args[0]
self.broker.getAmqpSession().exchange_delete (exchange=ename)
def AddQueue (self, args):
if len (args) < 1:
Usage ()
qname = args[0]
declArgs = {}
if _durable:
declArgs[FILECOUNT] = _fileCount
declArgs[FILESIZE] = _fileSize
if _maxQueueSize:
declArgs[MAX_QUEUE_SIZE] = _maxQueueSize
if _maxQueueCount:
declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
if _limitPolicy:
if _limitPolicy == "none":
pass
elif _limitPolicy == "reject":
declArgs[POLICY_TYPE] = "reject"
elif _limitPolicy == "flow-to-disk":
declArgs[POLICY_TYPE] = "flow_to_disk"
elif _limitPolicy == "ring":
declArgs[POLICY_TYPE] = "ring"
elif _limitPolicy == "ring-strict":
declArgs[POLICY_TYPE] = "ring_strict"
if _clusterDurable:
declArgs[CLUSTER_DURABLE] = 1
if _order:
if _order == "fifo":
pass
elif _order == "lvq":
declArgs[LVQ] = 1
elif _order == "lvq-no-browse":
declArgs[LVQNB] = 1
if _eventGeneration:
declArgs[QUEUE_EVENT_GENERATION] = _eventGeneration
if _altern_ex != None:
self.broker.getAmqpSession().queue_declare (queue=qname,
alternate_exchange=_altern_ex, passive=_passive, durable=_durable,
arguments=declArgs)
else:
self.broker.getAmqpSession().queue_declare (queue=qname,
passive=_passive, durable=_durable, arguments=declArgs)
def DelQueue (self, args):
if len (args) < 1:
Usage ()
qname = args[0]
self.broker.getAmqpSession().queue_delete (queue=qname,
if_empty=_if_empty, if_unused=_if_unused)
def Bind (self, args):
if len (args) < 2:
Usage ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
# query the exchange to determine its type.
res = self.broker.getAmqpSession().exchange_query(ename)
# type of the xchg determines the processing of the rest of
# argv. if it's an xml xchg, we want to find a file
# containing an x-query, and pass that. if it's a headers
# exchange, we need to pass either "any" or all, followed by a
# map containing key/value pairs. if neither of those, extra
# args are ignored.
ok = True
args = None
if res.type == "xml":
# this checks/imports the -f arg
[ok, xquery] = snarf_xquery_args()
args = { "xquery" : xquery }
# print args
else:
if res.type == "headers":
[ok, op, kv] = snarf_header_args(cargs[4:])
args = kv
args["x-match"] = op
if not ok:
sys.exit(1)
self.broker.getAmqpSession().exchange_bind (queue=qname,
exchange=ename,
binding_key=key,
arguments=args)
def Unbind (self, args):
if len (args) < 2:
Usage ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
self.broker.getAmqpSession().exchange_unbind (queue=qname,
exchange=ename, binding_key=key)
def findById (self, items, id):
for item in items:
if item.getObjectId() == id:
return item
return None
def match (self, name, filter):
if filter == "":
return True
if name.find (filter) == -1:
return False
return True
def YN (bool):
if bool:
return 'Y'
return 'N'
##
## Main Program
##
try:
longOpts = ("help", "durable", "cluster-durable", "bindings",
"broker-addr=", "file-count=",
"file-size=", "max-queue-size=", "max-queue-count=",
"limit-policy=",
"order=", "sequence", "ive", "generate-queue-events=", "force",
"force-if-not-empty",
"force_if_used", "alternate-exchange=", "passive", "timeout=",
"file=")
(optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "ha:bf:", longOpts)
except Exception, e:
Usage (short=True)
# make output match optparse-based tools' output, for consistent scripting
msg = str(e).replace('option', 'no such option:').replace('not recognized',
'')
print "qpid-config: error:", msg
sys.exit (1)
try:
encoding = locale.getpreferredencoding()
cargs = [a.decode(encoding) for a in encArgs]
except:
cargs = encArgs
for opt in optlist:
if opt[0] == "-h" or opt[0] == "--help":
Usage()
sys.exit(1)
if opt[0] == "-b" or opt[0] == "--bindings":
_recursive = True
if opt[0] == "-a" or opt[0] == "--broker-addr":
_host = opt[1]
if opt[0] == "-f" or opt[0] == "--file":
_file = opt[1]
if opt[0] == "--timeout":
_connTimeout = int(opt[1])
if _connTimeout == 0:
_connTimeout = None
if opt[0] == "--alternate-exchange":
_altern_ex = opt[1]
if opt[0] == "--passive":
_passive = True
if opt[0] == "--durable":
_durable = True
if opt[0] == "--cluster-durable":
_clusterDurable = True
if opt[0] == "--file-count":
_fileCount = int (opt[1])
if opt[0] == "--file-size":
_fileSize = int (opt[1])
if opt[0] == "--max-queue-size":
_maxQueueSize = int (opt[1])
if opt[0] == "--max-queue-count":
_maxQueueCount = int (opt[1])
if opt[0] == "--limit-policy":
_limitPolicy = opt[1]
if _limitPolicy not in ("none", "reject", "flow-to-disk", "ring",
"ring-strict"):
print "Error: Invalid --limit-policy argument"
sys.exit(1)
if opt[0] == "--order":
_order = opt[1]
if _order not in ("fifo", "lvq", "lvq-no-browse"):
print "Error: Invalid --order argument"
sys.exit(1)
if opt[0] == "--sequence":
_msgSequence = True
if opt[0] == "--ive":
_ive = True
if opt[0] == "--generate-queue-events":
_eventGeneration = int (opt[1])
if opt[0] == "--force":
_if_empty = False
_if_unused = False
if opt[0] == "--force-if-not-empty":
_if_empty = False
if opt[0] == "--force-if-used":
_if_unused = False
nargs = len (cargs)
bm = BrokerManager ()
try:
bm.SetBroker(_host)
if nargs == 0:
bm.Overview ()
else:
cmd = cargs[0]
modifier = ""
if nargs > 1:
modifier = cargs[1]
if cmd == "exchanges":
if _recursive:
bm.ExchangeListRecurse (modifier)
else:
bm.ExchangeList (modifier)
elif cmd == "queues":
if _recursive:
bm.QueueListRecurse (modifier)
else:
bm.QueueList (modifier)
elif cmd == "add":
if modifier == "exchange":
bm.AddExchange (cargs[2:])
elif modifier == "queue":
bm.AddQueue (cargs[2:])
else:
Usage ()
elif cmd == "del":
if modifier == "exchange":
bm.DelExchange (cargs[2:])
elif modifier == "queue":
bm.DelQueue (cargs[2:])
else:
Usage ()
elif cmd == "bind":
bm.Bind (cargs[1:])
elif cmd == "unbind":
bm.Unbind (cargs[1:])
else:
Usage ()
except KeyboardInterrupt:
print
except IOError, e:
print e
bm.Disconnect()
sys.exit(1)
except SystemExit, e:
bm.Disconnect()
sys.exit(1)
except Exception,e:
if e.__class__.__name__ != "Timeout":
# ignore Timeout exception, handle in the loop below
print "Failed: %s: %s" % (e.__class__.__name__, e)
bm.Disconnect()
sys.exit(1)
while True:
# some commands take longer than the default amqp timeout to complete,
# so attempt to disconnect until successful, ignoring Timeouts
try:
bm.Disconnect()
break
except Exception, e:
if e.__class__.__name__ != "Timeout":
print "Failed: %s: %s" % (e.__class__.__name__, e)
sys.exit(1)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:users-subscr...@qpid.apache.org