Howdy Lance,
You mention qpid-tool which uses QMF so I'm assuming that you are
talking about the C++ broker. The Java broker uses JMX and I'm not so
familiar with how that does instrumentation so the following is only
really applicable to the C++ broker.
Unfortunately QMF can be a bit non-trivial, it's not helped by the fact
that there are two variants - QMF1 which is a binary protocol and
accompanying API and QMF2 which is built on top of Map messages.
The python tools bundled with qpid used to use QMF1, though I *think*
that as of qpid 0.18 they've started to use QMF2 (trying 0.18
qpid-config on a 0.8 broker gets a method failed error which suggests
it's using QMF2 create method - though I've not actually looked at the
0.18 source).
If you are writing your tools in python there's a whole lot more stuff
available for QMF.
If you are writing in Java a year back I wrote a complete Java
implementation of the QMF2 API:
https://cwiki.apache.org/qpid/qmfv2-api-proposal.html
based on the protocol:
https://cwiki.apache.org/qpid/qmf-map-message-protocol.html
The link to my code is here:
https://issues.apache.org/jira/browse/QPID-3675
Slightly frustratingly there hasn't been much feedback on this - I'd
really like to see it make it into the main code base - I put a huge
amount of effort into it. The code in that Jira link also contains
tests/tools that illustrate how to use QMF2 in Java in some non-trivial
scenarios.
As it happens in a few days time I'm about to release a major uplift to
that, which will include a QMF2 REST API and most significantly a
complete HTML5 based web UI that will enable inspection of all QMF
properties and also queue/exchange/binding adding/deletion so that may
be just the sort of thing you're interested in - it's pretty close, I'm
just doing some last minute de-snagging trying to get it looking nice on
small real-estate mobile browsers.
To more directly answer your question "However, I’m having a hard time
making that same mapping between the connection stuff and a sender to an
exchange. " yeah there's some nuance there :-) the main thing is to
think in terms of UML associations, so for example a binding object has
queueRef and exchangeRef properties which hold ObjectIds that enable the
associated queue and exchange to be indexed (binding behaves like a UML
association class really) what this means though if you want to work out
how a queue links to an exchange you have to do it from the perspective
of the binding.
Similarly it's quite involved to find the subscriptions associated with
a queue because the navigation goes "the wrong way" that is to say
subscription has a queueRef property.
What I've found is that in general the best (and by far in a way most
efficient) way to do non-trivial QMF stuff is to to the getObject()
stuff once for each of the objects that you care about then dereference
those objects into Maps indexed by objectId - if you do that then you
can index everything you want easily and cheaply.
If you look at the code for things like qpid-config it's pretty easy to
get into the trap of doing getObjects() calls on inner loops, which is
heinously inefficient :-) because each getObjects() call does a
messaging request/response under the hood.
To get you up and running and give a bit of insight (I hope!!) I've
attached a couple of python programs I wrote last year (these use QMF1)
connection-audit checks when connections are made and looks up the
queues being subscribed to against a whitelist, basically if the queue
being subscribed to is not a queue that's in the whitelist it generates
a log message.
connection-audit-dom is the same but uses DOM based XML parsing (I
needed to do that when I had to use it against an oldish python version)
connection-logger does a lot of what I think you care about, basically
it logs all connections to a broker and provides useful info on them a
la qpid-config -b queues.
connection-logger-orig was my original version of this which was
somewhat erm "rushed out" :-) at face value it looks OK for a few queues
etc. but if you ramp up a load of connections - well try and add a
couple of hundred queues and you'll see what I meant by heinously
inefficient above :-D.
connection-logger and connection-logger-orig are functionally equivalent
so it's worth looking through both to give a bit of insight into the
right way and wrong way to go about things.
I'm not a python programmer by any stretch of the imagination, but I
think that they are at least fair examples.
Keep an eye out for my QMF2/UI update over the next few days but
hopefully this response has given you a decent leg-up.
Best regards,
Frase
On 11/01/13 02:24, Lance D. wrote:
Hello all.
I’m in the process of building QPID inspection tools for a project I’m
working on. Now, I’m looking at the results of the qpid-tool. I see that
the connection schema has a bunch of useful info (like remote pid, process,
etc). I’ve also found ways to link that info up with the queues to figure
out who’s subscribed to what. However, I’m having a hard time making that
same mapping between the connection stuff and a sender to an exchange.
My question is, is there a way to do that mapping (either with the existing
tools, or by tapping into the qmf API?
Thanks,
-Lance
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# connection-logger is used to provide information about connections made to a
broker.
#
# In default mode connection-logger lists the connections made to a broker
along with information about sessions
# such as whether any subscriptions are associated with the session (if a
session has no subscriptions then it's
# quite likely to be a "producer only" session, so this knowledge is quite
useful).
#
# In "log queue and binding" mode the information provided is very similar to
qpid-config -b queues but with
# additional connection related information provided as with default mode.
from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import sys
import socket
import os
import locale
from qmf.console import Session
from time import time, strftime, gmtime, localtime, sleep
usage = "Usage: connection-logger [OPTIONS]"
def Usage():
print usage
class Config:
def __init__(self):
self._logQueues = False
self._address = "localhost"
self._client_sasl_mechanism = None
config = Config()
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage
strings"""
def format_usage(self, usage):
return usage
def format_description(self, description):
return ""
def OptionsAndArguments(argv):
parser = OptionParser(usage=usage, description="",
formatter=JHelpFormatter())
parser.add_option("-q", "--log-queues", action="store_true", help="log
queue and binding information for consumer connections")
parser.add_option("-a", "--broker-address", action="store", type="string",
metavar="<address>", help="broker-addr is in the form: [username/password@]
hostname | ip-address [:<port>] ex: localhost, 10.1.1.7:10000,
broker-host:10000, guest/guest@localhost")
parser.add_option("--client-sasl-mechanism", action="store", type="string",
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL,
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects
to the destination broker (not for authentication between the source and
destination brokers - that is specified using the [mechanisms] argument to 'add
route'). SASL automatically picks the most secure available mechanism - use
this option to override.")
opts, encArgs = parser.parse_args(args=argv)
try:
encoding = locale.getpreferredencoding()
args = [a.decode(encoding) for a in encArgs]
except:
args = encArgs
if opts.log_queues:
config._logQueues = True
if opts.broker_address: # Note underscore not hyphen used here
config._address = opts.broker_address
if opts.client_sasl_mechanism: # Note underscore not hyphen used here
config._client_sasl_mechanism = opts.client_sasl_mechanism
return args
class BrokerManager: # The first bit of this class is mostly lifted straight
from qpid-config
def __init__(self):
self.brokerName = None
self.qmf = None
self.broker = None
self.connections = None
self.connectionToSessionAssociations = {}
self.queueMap = {}
self.exchangeMap = {}
def setBroker(self): # Tweaked addBroker below to use values from Config
class
self.qmf = Session()
self.broker = self.qmf.addBroker(config._address, 10,
config._client_sasl_mechanism)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == '0':
self.brokerAgent = a
def disconnect(self):
if self.broker:
self.qmf.delBroker(self.broker)
# Retrieve QmfConsoleData Objects from the broker. N.B. these are retrieved
up-front as each call involves
# a potentially expensive query to the broker so we don't want to do any of
this in an inner loop.
#
# QMF uses lots of associations between various objects so we also populate
Maps of the QMF Objects keyed by
# their ObjectIds and dereference 0..* associations here so we can avoid
linear searches later, this is pretty
# important because if we have lots of QMF Objects (Connections/Queues
etc.) a naive approach using linear
# searches to dereference these things (a la qpid-config) will completely
trash the performance.
def getQmfObjects(self):
sessionToSubscriptionAssociations = {}
self.connectionToSessionAssociations.clear()
# Create the 0..* association between session and subscription
subscriptions = self.qmf.getObjects(_class="subscription",
_agent=self.brokerAgent)
for subscription in subscriptions:
sessionRef = subscription.sessionRef
# Create the Subscriptions association array and store it keyed by
the sessionRef, when we go to store
# the actual Session we can retrieve the association and store it
as a property of the Session.
if (sessionRef not in sessionToSubscriptionAssociations):
sessionToSubscriptionAssociations[sessionRef] = [subscription]
else:
sessionToSubscriptionAssociations[sessionRef].append(subscription);
# Create the 0..* association between connection and session
sessions = self.qmf.getObjects(_class="session",
_agent=self.brokerAgent)
for session in sessions:
sessionId = session.getObjectId()
connectionRef = session.connectionRef
subs = []
if (sessionId in sessionToSubscriptionAssociations):
subs = sessionToSubscriptionAssociations[sessionId]
session._subscriptions = subs # Store the association as a property
of session for easy retrieval
# Create the Sessions association array and store it keyed by the
connectionRef, when we go to store
# the actual Connection we can retrieve the association and store
it as a property of the Connection.
if (connectionRef not in self.connectionToSessionAssociations):
self.connectionToSessionAssociations[connectionRef] = [session]
else:
self.connectionToSessionAssociations[connectionRef].append(session);
self.connections = self.qmf.getObjects(_class="connection",
_agent=self.brokerAgent)
# Only do the following if the -q option has been selected
if config._logQueues:
queueToBindingAssociations = {}
self.queueMap.clear()
self.exchangeMap.clear()
# Create the 0..* association between queue and bindings
bindings = self.qmf.getObjects(_class="binding",
_agent=self.brokerAgent)
for binding in bindings:
queueRef = binding.queueRef
# Create the Bindings association array and store it keyed by
the queueRef, when we go to store
# the actual Queue we can retrieve the association and store it
as a property of the Queue.
if (queueRef not in queueToBindingAssociations):
queueToBindingAssociations[queueRef] = [binding]
else:
queueToBindingAssociations[queueRef].append(binding);
queues = self.qmf.getObjects(_class="queue",
_agent=self.brokerAgent)
for queue in queues:
queueId = queue.getObjectId()
bindings = []
if (queueId in queueToBindingAssociations):
bindings = queueToBindingAssociations[queueId]
queue._bindings = bindings # Store the association as a
property of queue for easy retrieval
self.queueMap[queueId] = queue
exchanges = self.qmf.getObjects(_class="exchange",
_agent=self.brokerAgent)
for exchange in exchanges:
self.exchangeMap[exchange.getObjectId()] = exchange
# For every queue list the bindings
def logQueueInformation(self, ref):
queue = self.queueMap[ref] # Look up queue by its ObjectId
print " Queue '%s'" % (queue.name)
print " arguments %s" % (queue.arguments)
for bind in queue._bindings: # Use the 0..* association with bindings
we populated in getQmfObjects()
ename = "<unknown>"
ex = self.exchangeMap[bind.exchangeRef] # Look up exchange by its
ObjectId
if ex != None:
ename = ex.name
if ename == "":
ename = "''"
if bind.arguments: # If there are binding arguments then it's a
headers exchange
print " bind [%s] => %s %s" % (bind.bindingKey, ename,
bind.arguments)
else:
print " bind [%s] => %s" % (bind.bindingKey, ename)
# Logs audit information about each connection made to the broker
#
# Obtains connection, session and subscription objects and iterates in turn
through these comparing
# references to find the subscriptions association with sessions and
sessions associated with
# connections. Ultimately it then uses logQueueInformation to display the
queues associated with
# each subscription.
def logConnectionInformation(self):
print "\n\n**** connection-logger: Logging current connection
information ****"
for connection in self.connections:
print "\nConnection '%s'" % (connection.address)
print "authIdentity: %s" % (connection.authIdentity)
print "remoteProcessName: %s" % (connection.remoteProcessName)
print "federationLink: %s" % (connection.federationLink)
ctime = connection.getTimestamps()[1]/1000000000
print "createTimestamp: %s" % strftime("%c", localtime(ctime))
connectionId = connection.getObjectId()
sessions = []
if (connectionId in self.connectionToSessionAssociations):
sessions = self.connectionToSessionAssociations[connectionId] #
Retrieve the association array
for session in sessions: # Iterate through all session objects
print "Session: '%s'" % (session.name)
subscriptions = session._subscriptions
for subscription in subscriptions: # Iterate through all
subscription objects
queueRef = subscription.queueRef
if config._logQueues: # Display detailed info about queues
and bindings if needed
self.logQueueInformation(queueRef)
if (len(subscriptions) == 0):
print " ** No Subscriptions for this Session - probably
a producer only Session **"
def main(argv=None):
args = OptionsAndArguments(argv)
bm = BrokerManager()
try:
bm.setBroker()
bm.getQmfObjects()
bm.logConnectionInformation()
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
bm.disconnect()
return 0
if __name__ == "__main__":
sys.exit(main())
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# connection-logger is used to provide information about connections made to a
broker.
#
# In default mode connection-logger lists the connections made to a broker
along with information about sessions
# such as whether any subscriptions are associated with the session (if a
session has no subscriptions then it's
# quite likely to be a "producer only" session, so this knowledge is quite
useful).
#
# In "log queue and binding" mode the information provided is very similar to
qpid-config -b queues but with
# additional connection related information provided as with default mode.
from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import sys
import socket
import os
import locale
from qmf.console import Session
from time import time, strftime, gmtime, localtime, sleep
usage = "Usage: connection-logger [OPTIONS]"
def Usage():
print usage
class Config:
def __init__(self):
self._logQueues = False
self._address = "localhost"
self._client_sasl_mechanism = None
config = Config()
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage
strings"""
def format_usage(self, usage):
return usage
def format_description(self, description):
return ""
def OptionsAndArguments(argv):
parser = OptionParser(usage=usage, description="",
formatter=JHelpFormatter())
parser.add_option("-q", "--log-queues", action="store_true", help="log
queue and binding information for consumer connections")
parser.add_option("-a", "--broker-address", action="store", type="string",
metavar="<address>", help="broker-addr is in the form: [username/password@]
hostname | ip-address [:<port>] ex: localhost, 10.1.1.7:10000,
broker-host:10000, guest/guest@localhost")
parser.add_option("--client-sasl-mechanism", action="store", type="string",
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL,
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects
to the destination broker (not for authentication between the source and
destination brokers - that is specified using the [mechanisms] argument to 'add
route'). SASL automatically picks the most secure available mechanism - use
this option to override.")
opts, encArgs = parser.parse_args(args=argv)
try:
encoding = locale.getpreferredencoding()
args = [a.decode(encoding) for a in encArgs]
except:
args = encArgs
if opts.log_queues:
config._logQueues = True
if opts.broker_address: # Note underscore not hyphen used here
config._address = opts.broker_address
if opts.client_sasl_mechanism: # Note underscore not hyphen used here
config._client_sasl_mechanism = opts.client_sasl_mechanism
return args
class BrokerManager: # The first bit of this class is mostly lifted straight
from qpid-config
def __init__(self):
self.brokerName = None
self.qmf = None
self.broker = None
def setBroker(self): # Tweaked addBroker below to use values from Config
class
self.qmf = Session()
self.broker = self.qmf.addBroker(config._address, 10,
config._client_sasl_mechanism)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == '0':
self.brokerAgent = a
def disconnect(self):
if self.broker:
self.qmf.delBroker(self.broker)
def findById(self, items, id):
for item in items:
if item.getObjectId() == id:
return item
return None
# For every queue list the bindings (pretty much taken from qpid-config
QueueListRecurse with minor tweaks)
def logQueueInformation(self, ref):
exchanges = self.qmf.getObjects(_class="exchange",
_agent=self.brokerAgent)
bindings = self.qmf.getObjects(_class="binding",
_agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
for queue in queues:
queueId = queue.getObjectId()
if (ref == None or ref == queueId):
print " Queue '%s'" % (queue.name)
print " arguments %s" % (queue.arguments)
for bind in bindings:
if bind.queueRef == queue.getObjectId():
ename = "<unknown>"
ex = self.findById (exchanges, bind.exchangeRef)
if ex != None:
ename = ex.name
if ename == "":
ename = "''"
if bind.arguments: # If there are binding arguments
then it's a headers exchange
print " bind [%s] => %s %s" %
(bind.bindingKey, ename, bind.arguments)
else:
print " bind [%s] => %s" % (bind.bindingKey,
ename)
# Logs audit information about each connection made to the broker
#
# Obtains connection, session and subscription objects and iterates in turn
through these comparing
# references to find the subscriptions association with sessions and
sessions associated with
# connections. Ultimately it then uses logQueueInformation to display the
queues associated with
# each subscription.
def logConnectionInformation(self):
print "\n\n**** connection-logger: Logging current connection
information ****"
connections = self.qmf.getObjects(_class="connection",
_agent=self.brokerAgent)
sessions = self.qmf.getObjects(_class="session",
_agent=self.brokerAgent)
subscriptions = self.qmf.getObjects(_class="subscription",
_agent=self.brokerAgent)
for connection in connections:
print "\nConnection '%s'" % (connection.address)
print "authIdentity: %s" % (connection.authIdentity)
print "remoteProcessName: %s" % (connection.remoteProcessName)
print "federationLink: %s" % (connection.federationLink)
ctime = connection.getTimestamps()[1]/1000000000
print "createTimestamp: %s" % strftime("%c", localtime(ctime))
connectionId = connection.getObjectId()
for session in sessions: # Iterate through all session objects
connectionRef = session.connectionRef
if connectionRef == connectionId:
# Only select sessions that are associated with the
connection under consideration.
print "Session: '%s'" % (session.name)
subscriptionCount = 0
sessionId = session.getObjectId()
for subscription in subscriptions: # Iterate through all
subscription objects
sessionRef = subscription.sessionRef
if sessionRef == sessionId:
# Only select subscriptions that are associated
with the session under consideration.
subscriptionCount = subscriptionCount + 1
queueRef = subscription.queueRef
if config._logQueues: # Display detailed info about
queues and bindings if needed
self.logQueueInformation(queueRef)
if subscriptionCount == 0:
print " ** No Subscriptions for this Session -
probably a producer only Session **"
def main(argv=None):
args = OptionsAndArguments(argv)
bm = BrokerManager()
try:
bm.setBroker()
bm.logConnectionInformation()
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
bm.disconnect()
return 0
if __name__ == "__main__":
sys.exit(main())
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This application audits connections to one or more Qpid message brokers.
# Exchange and Queue names are checked against a whitelist and if no match is
found an alert is generated
import os
import os.path
import optparse
from optparse import IndentedHelpFormatter
import sys
import socket
from time import time, strftime, gmtime, localtime, sleep
from qmf.console import Console, Session
from xml.etree.ElementTree import ElementTree
class AuditConsole(Console):
# AuditConsole Constructor
def __init__(self, whitelistFile):
self.whitelistFile = whitelistFile
self.whitelistLastModified = None
# Declare dictionary to map queue name to queue objects
self.queueNameMap = {}
# Declare dictionary to map queue name to subscription objects
self.queueNameSubscriptionMap = {}
# Declare dictionaries to map object-ids to objects
self.queueMap = {}
self.exchangeMap = {}
self.bindingMap = {}
self.connectionMap = {}
self.sessionMap = {}
# Declare the sets to be used as the whitelists
self.exchangeWhitelist = set()
self.queueWhitelist = set()
# Overridden method from Console
# Handle property updates. This method registers or deletes the relevant
objects in the relevant Map
# These objects are used by the validateSubscription() in order to find out
information about the
# bindings and connections associated with a given queue in order to work
out if it needs to be tested
# against the whitelist. For example we don't validate queues bound to the
management exchanges as
# this would generate spurious results as most of these queues are
temporary queues.
def objectProps(self, broker, record):
classKey = record.getClassKey()
oid = record.getObjectId()
deleted = record.getTimestamps()[2] > 0
# Note that queue objects are keyed by both queue name and Object ID
if classKey.getClassName() == "queue":
queueName = record.name
if oid not in self.queueMap and not deleted:
self.queueMap[oid] = record
if queueName not in self.queueNameMap and not deleted:
self.queueNameMap[queueName] = record
# If the queue object has been deleted remove it from the map
if deleted:
if queueName in self.queueNameMap:
self.queueNameMap.pop(queueName)
if oid in self.queueMap:
self.queueMap.pop(oid)
if classKey.getClassName() == "exchange":
if oid not in self.exchangeMap and not deleted:
self.exchangeMap[oid] = record
# If the exchange object has been deleted remove it from the map
if deleted and oid in self.exchangeMap:
self.exchangeMap.pop(oid)
if classKey.getClassName() == "binding":
if oid not in self.bindingMap and not deleted:
self.bindingMap[oid] = record
# If the binding object has been deleted remove it from the map
if deleted and oid in self.bindingMap:
self.bindingMap.pop(oid)
if classKey.getClassName() == "connection":
if oid not in self.connectionMap and not deleted:
# Re-read the whitelist when a new connection occurs
self.readWhitelist()
self.connectionMap[oid] = record
# If the connection object has been deleted remove it from the map
if deleted and oid in self.connectionMap:
self.connectionMap.pop(oid)
if classKey.getClassName() == "session":
if oid not in self.sessionMap and not deleted:
self.sessionMap[oid] = record
# If the session object has been deleted remove it from the map
if deleted and oid in self.sessionMap:
self.sessionMap.pop(oid)
# Note that subscription objects are keyed by queue name NOT by Object
ID
if classKey.getClassName() == "subscription":
# Find the queue associated with the subscription by looking its
reference up in queueMap
queue = self.queueMap.get(record.queueRef)
if queue != None:
queueName = queue.name
if queueName not in self.queueNameSubscriptionMap and not
deleted:
self.queueNameSubscriptionMap[queueName] = record
self.validateSubscription(queueName)
# If the subscription object has been deleted remove it from
the map
if deleted and queueName in self.queueNameSubscriptionMap:
self.queueNameSubscriptionMap.pop(queueName)
# This method first checks if the whitelist file exists, if not it clears
the sets used as whitelists
# so that no whitelisting is applied. If the whitelist file does exist it
is parsed by an ElementTree
# we look for all exchange and queue elements and populate the respective
whitelist sets with their
# contents. Note that we check the whitelist file update time to avoid
reading it if it hasn't been changed
def readWhitelist(self):
if os.path.exists(self.whitelistFile):
mtime = os.path.getmtime(self.whitelistFile)
if mtime != self.whitelistLastModified:
self.whitelistLastModified = mtime
self.exchangeWhitelist.clear()
self.queueWhitelist.clear()
tree = ElementTree()
try:
tree.parse(self.whitelistFile)
exchanges = tree.findall("exchangeWhitelist/exchange")
for i in exchanges:
exchange = i.text
if exchange == None:
self.exchangeWhitelist.add("")
else:
self.exchangeWhitelist.add(exchange)
queues = tree.findall("queueWhitelist/queue")
for i in queues:
queue = i.text
self.queueWhitelist.add(queue)
except Exception, e: # Failed to parse correctly
print strftime("%c", localtime(time())), "WARN
connection-audit:readWhitelist failed: %s - %s" % (e.__class__.__name__, e)
else: # If whitelist file doesn't exist log a warning and clear the
whitelists
print strftime("%c", localtime(time())), "WARN
connection-audit:readWhitelist %s doesn't exist" % self.whitelistFile
sys.stdout.flush()
self.exchangeWhitelist.clear()
self.queueWhitelist.clear()
# Given a queue name this method looks up the associated subscription,
session and connection
# objects and returns the connection
def findConnection(self, queueName):
subscription = self.queueNameSubscriptionMap.get(queueName)
if subscription != None:
session = self.sessionMap.get(subscription.sessionRef)
if session != None:
connection = self.connectionMap.get(session.connectionRef)
if connection != None:
return connection
# This method finds bindings that match a given queue name, if one is found
the associated exchange is
# recovered and then a call to validateQueue() is made
def validateSubscription(self, queueName):
for binding in self.bindingMap.values():
queue = self.queueMap.get(binding.queueRef)
if queue != None and queue.name == queueName:
exchange = self.exchangeMap.get(binding.exchangeRef)
if exchange != None:
self.validateQueue(queueName, exchange, binding)
# This method validates the specified queue by comparing the queue name and
associated exchange name
# against the whitelist sets. If the exchange name or the queue name is in
the whitelist then we do
# nothing, but if not we generate an alert message.
def validateQueue(self, queueName, exchange, binding):
exchangeName = exchange.name
if exchangeName in self.exchangeWhitelist:
return
if queueName in self.queueWhitelist:
return
connection = self.findConnection(queueName)
address = connection.address
ctime = connection.getTimestamps()[1]/1000000000
if exchangeName == "":
exchangeName = "''"
if binding.arguments:
print strftime("%c", localtime(time())), "ALERT
connection-audit:validateQueue validation failed for queue: %s with binding[%s]
=> %s %s from address: %s with connection timestamp" % (queueName,
binding.bindingKey, exchangeName, binding.arguments, address), strftime("%c",
localtime(ctime))
else:
print strftime("%c", localtime(time())), "ALERT
connection-audit:validateQueue validation failed for queue: %s with binding[%s]
=> %s from address: %s with connection timestamp" % (queueName,
binding.bindingKey, exchangeName, address), strftime("%c", localtime(ctime))
sys.stdout.flush()
# Overridden method from Console
# def event(self, broker, event):
# if event.classKey.getClassName() == "subscribe":
# Overridden method from Console
def brokerConnected(self, broker):
# Log brokerConnected event
print strftime("%c", localtime(time())), "NOTIC
connection-audit:brokerConnected broker=%s" % broker.getUrl()
sys.stdout.flush()
# Overridden method from Console
def brokerConnectionFailed(self, broker):
# Log brokerConnectionFailed event
print strftime("%c", localtime(time())), "NOTIC
connection-audit:brokerConnectionFailed broker=%s %s" % (broker.getUrl(),
str(broker.conn_exc))
sys.stdout.flush()
# Overridden method from Console
def brokerDisconnected(self, broker):
# Clear state as it'll get resent when the broker reconnects
self.queueNameMap.clear()
self.queueNameSubscriptionMap.clear()
self.queueMap.clear()
self.exchangeMap.clear()
self.bindingMap.clear()
self.connectionMap.clear()
self.sessionMap.clear()
# Log brokerDisconnected event
print strftime("%c", localtime(time())), "NOTIC
connection-audit:brokerDisconnected broker=%s" % broker.getUrl()
sys.stdout.flush()
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage
strings
"""
def format_usage(self, usage):
return usage
def format_description(self, description):
if description:
return description + "\n"
else:
return ""
_usage = "%prog [options] [broker-addr]..."
_description = \
"""
Audits connections to one or more Qpid message brokers.
Exchange and Queue names are checked against a whitelist and if no match is
found an alert is generated
If no broker-addr is supplied, %prog connects to 'localhost:5672'.
[broker-addr] syntax:
[username/password@] hostname
ip-address [:<port>]
Examples:
$ %prog localhost:5672
$ %prog 10.1.1.7:10000
$ %prog guest/guest@broker-host:10000
"""
def main(argv=None):
p = optparse.OptionParser(usage=_usage, description=_description,
formatter=JHelpFormatter())
p.add_option("--sasl-mechanism", action="store", type="string",
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL,
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the
most secure available mechanism - use this option to override.")
p.add_option("--whitelist", action="store", type="string",
default="./whitelist.xml", metavar="<whitelist XML document>", help="The fully
qualified name of the whitelist XML file, default is ./whitelist.xml")
options, arguments = p.parse_args(args=argv)
if len(arguments) == 0:
arguments.append("localhost")
console = AuditConsole(options.whitelist)
session = Session(console, rcvObjects=True, rcvEvents=True,
userBindings=True, manageConnections=True)
# Register to receive updates for broker:queue objects.
session.bindClass("org.apache.qpid.broker", "queue")
# Register to receive updates for broker:exchange objects.
session.bindClass("org.apache.qpid.broker", "exchange")
# Register to receive updates for broker:binding objects.
session.bindClass("org.apache.qpid.broker", "binding")
# Register to receive updates for broker:connection objects.
session.bindClass("org.apache.qpid.broker", "connection")
# Register to receive updates for broker:session objects.
session.bindClass("org.apache.qpid.broker", "session")
# Register to receive updates for broker:subscription objects.
session.bindClass("org.apache.qpid.broker", "subscription")
# Register to receive updates for broker:subscribe event.
# session.bindEvent("org.apache.qpid.broker", "subscribe")
brokers = []
try:
try:
for host in arguments:
brokers.append(session.addBroker(host, None,
options.sasl_mechanism))
while (True):
sleep(10)
except KeyboardInterrupt:
print
return 0
except Exception, e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
finally:
while len(brokers):
b = brokers.pop()
session.delBroker(b)
if __name__ == '__main__':
sys.exit(main())
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This application audits connections to one or more Qpid message brokers.
# Exchange and Queue names are checked against a whitelist and if no match is
found an alert is generated
import os
import os.path
import optparse
from optparse import IndentedHelpFormatter
import sys
import socket
from time import time, strftime, gmtime, localtime, sleep
from qmf.console import Console, Session
#from xml.etree.ElementTree import ElementTree
from xml.dom import minidom, Node
class AuditConsole(Console):
# AuditConsole Constructor
def __init__(self, whitelistFile):
self.whitelistFile = whitelistFile
self.whitelistLastModified = None
# Declare dictionary to map queue name to queue objects
self.queueNameMap = {}
# Declare dictionary to map queue name to subscription objects
self.queueNameSubscriptionMap = {}
# Declare dictionaries to map object-ids to objects
self.queueMap = {}
self.exchangeMap = {}
self.bindingMap = {}
self.connectionMap = {}
self.sessionMap = {}
# Declare the sets to be used as the whitelists
self.exchangeWhitelist = set()
self.queueWhitelist = set()
# Overridden method from Console
# Handle property updates. This method registers or deletes the relevant
objects in the relevant Map
# These objects are used by the validateSubscription() in order to find out
information about the
# bindings and connections associated with a given queue in order to work
out if it needs to be tested
# against the whitelist. For example we don't validate queues bound to the
management exchanges as
# this would generate spurious results as most of these queues are
temporary queues.
def objectProps(self, broker, record):
classKey = record.getClassKey()
oid = record.getObjectId()
deleted = record.getTimestamps()[2] > 0
# Note that queue objects are keyed by both queue name and Object ID
if classKey.getClassName() == "queue":
queueName = record.name
if oid not in self.queueMap and not deleted:
self.queueMap[oid] = record
if queueName not in self.queueNameMap and not deleted:
self.queueNameMap[queueName] = record
# If the queue object has been deleted remove it from the map
if deleted:
if queueName in self.queueNameMap:
self.queueNameMap.pop(queueName)
if oid in self.queueMap:
self.queueMap.pop(oid)
if classKey.getClassName() == "exchange":
if oid not in self.exchangeMap and not deleted:
self.exchangeMap[oid] = record
# If the exchange object has been deleted remove it from the map
if deleted and oid in self.exchangeMap:
self.exchangeMap.pop(oid)
if classKey.getClassName() == "binding":
if oid not in self.bindingMap and not deleted:
self.bindingMap[oid] = record
# If the binding object has been deleted remove it from the map
if deleted and oid in self.bindingMap:
self.bindingMap.pop(oid)
if classKey.getClassName() == "connection":
if oid not in self.connectionMap and not deleted:
# Re-read the whitelist when a new connection occurs
self.readWhitelist()
self.connectionMap[oid] = record
# If the connection object has been deleted remove it from the map
if deleted and oid in self.connectionMap:
self.connectionMap.pop(oid)
if classKey.getClassName() == "session":
if oid not in self.sessionMap and not deleted:
self.sessionMap[oid] = record
# If the session object has been deleted remove it from the map
if deleted and oid in self.sessionMap:
self.sessionMap.pop(oid)
# Note that subscription objects are keyed by queue name NOT by Object
ID
if classKey.getClassName() == "subscription":
# Find the queue associated with the subscription by looking its
reference up in queueMap
queue = self.queueMap.get(record.queueRef)
if queue != None:
queueName = queue.name
if queueName not in self.queueNameSubscriptionMap and not
deleted:
self.queueNameSubscriptionMap[queueName] = record
self.validateSubscription(queueName)
# If the subscription object has been deleted remove it from
the map
if deleted and queueName in self.queueNameSubscriptionMap:
self.queueNameSubscriptionMap.pop(queueName)
# This method first checks if the whitelist file exists, if not it clears
the sets used as whitelists
# so that no whitelisting is applied. If the whitelist file does exist it
is parsed by a DOM parser
# we look for all exchange and queue elements and populate the respective
whitelist sets with their
# contents. Note that we check the whitelist file update time to avoid
reading it if it hasn't been changed
def readWhitelist(self):
if os.path.exists(self.whitelistFile):
mtime = os.path.getmtime(self.whitelistFile)
if mtime != self.whitelistLastModified:
self.whitelistLastModified = mtime
self.exchangeWhitelist.clear()
self.queueWhitelist.clear()
# tree = ElementTree()
# try:
# tree.parse(self.whitelistFile)
# exchanges = tree.findall("exchangeWhitelist/exchange")
# for i in exchanges:
# exchange = i.text
# if exchange == None:
# self.exchangeWhitelist.add("")
# else:
# self.exchangeWhitelist.add(exchange)
# queues = tree.findall("queueWhitelist/queue")
# for i in queues:
# queue = i.text
# self.queueWhitelist.add(queue)
try:
doc = minidom.parse(self.whitelistFile)
whitelist = doc.documentElement
if whitelist.nodeName == "whitelist":
children = whitelist.childNodes
for child in children:
if child.nodeName == "exchangeWhitelist":
exchanges = child.childNodes
for i in exchanges:
if i.nodeName == "exchange":
if not i.hasChildNodes():
self.exchangeWhitelist.add("")
else:
exchange = i.firstChild.nodeValue
self.exchangeWhitelist.add(exchange)
if child.nodeName == "queueWhitelist":
queues = child.childNodes
for i in queues:
if i.nodeName == "queue":
if i.hasChildNodes():
queues = i.firstChild.nodeValue
self.queueWhitelist.add(queues)
except Exception, e: # Failed to parse correctly
print strftime("%c", localtime(time())), "WARN
connection-audit:readWhitelist failed: %s - %s" % (e.__class__.__name__, e)
else: # If whitelist file doesn't exist log a warning and clear the
whitelists
print strftime("%c", localtime(time())), "WARN
connection-audit:readWhitelist %s doesn't exist" % self.whitelistFile
sys.stdout.flush()
self.exchangeWhitelist.clear()
self.queueWhitelist.clear()
# Given a queue name this method looks up the associated subscription,
session and connection
# objects and returns the connection
def findConnection(self, queueName):
subscription = self.queueNameSubscriptionMap.get(queueName)
if subscription != None:
session = self.sessionMap.get(subscription.sessionRef)
if session != None:
connection = self.connectionMap.get(session.connectionRef)
if connection != None:
return connection
# This method finds bindings that match a given queue name, if one is found
the associated exchange is
# recovered and then a call to validateQueue() is made
def validateSubscription(self, queueName):
for binding in self.bindingMap.values():
queue = self.queueMap.get(binding.queueRef)
if queue != None and queue.name == queueName:
exchange = self.exchangeMap.get(binding.exchangeRef)
if exchange != None:
self.validateQueue(queueName, exchange, binding)
# This method validates the specified queue by comparing the queue name and
associated exchange name
# against the whitelist sets. If the exchange name or the queue name is in
the whitelist then we do
# nothing, but if not we generate an alert message.
def validateQueue(self, queueName, exchange, binding):
exchangeName = exchange.name
if exchangeName in self.exchangeWhitelist:
return
if queueName in self.queueWhitelist:
return
connection = self.findConnection(queueName)
address = connection.address
ctime = connection.getTimestamps()[1]/1000000000
if exchangeName == "":
exchangeName = "''"
if binding.arguments:
print strftime("%c", localtime(time())), "ALERT
connection-audit:validateQueue validation failed for queue: %s with binding[%s]
=> %s %s from address: %s with connection timestamp" % (queueName,
binding.bindingKey, exchangeName, binding.arguments, address), strftime("%c",
localtime(ctime))
else:
print strftime("%c", localtime(time())), "ALERT
connection-audit:validateQueue validation failed for queue: %s with binding[%s]
=> %s from address: %s with connection timestamp" % (queueName,
binding.bindingKey, exchangeName, address), strftime("%c", localtime(ctime))
sys.stdout.flush()
# Overridden method from Console
# def event(self, broker, event):
# if event.classKey.getClassName() == "subscribe":
# Overridden method from Console
def brokerConnected(self, broker):
# Log brokerConnected event
print strftime("%c", localtime(time())), "NOTIC
connection-audit:brokerConnected broker=%s" % broker.getUrl()
sys.stdout.flush()
# Overridden method from Console
def brokerConnectionFailed(self, broker):
# Log brokerConnectionFailed event
print strftime("%c", localtime(time())), "NOTIC
connection-audit:brokerConnectionFailed broker=%s %s" % (broker.getUrl(),
str(broker.conn_exc))
sys.stdout.flush()
# Overridden method from Console
def brokerDisconnected(self, broker):
# Clear state as it'll get resent when the broker reconnects
self.queueNameMap.clear()
self.queueNameSubscriptionMap.clear()
self.queueMap.clear()
self.exchangeMap.clear()
self.bindingMap.clear()
self.connectionMap.clear()
self.sessionMap.clear()
# Log brokerDisconnected event
print strftime("%c", localtime(time())), "NOTIC
connection-audit:brokerDisconnected broker=%s" % broker.getUrl()
sys.stdout.flush()
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage
strings
"""
def format_usage(self, usage):
return usage
def format_description(self, description):
if description:
return description + "\n"
else:
return ""
_usage = "%prog [options] [broker-addr]..."
_description = \
"""
Audits connections to one or more Qpid message brokers.
Exchange and Queue names are checked against a whitelist and if no match is
found an alert is generated
If no broker-addr is supplied, %prog connects to 'localhost:5672'.
[broker-addr] syntax:
[username/password@] hostname
ip-address [:<port>]
Examples:
$ %prog localhost:5672
$ %prog 10.1.1.7:10000
$ %prog guest/guest@broker-host:10000
"""
def main(argv=None):
p = optparse.OptionParser(usage=_usage, description=_description,
formatter=JHelpFormatter())
p.add_option("--sasl-mechanism", action="store", type="string",
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL,
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the
most secure available mechanism - use this option to override.")
p.add_option("--whitelist", action="store", type="string",
default="./whitelist.xml", metavar="<whitelist XML document>", help="The fully
qualified name of the whitelist XML file, default is ./whitelist.xml")
options, arguments = p.parse_args(args=argv)
if len(arguments) == 0:
arguments.append("localhost")
console = AuditConsole(options.whitelist)
session = Session(console, rcvObjects=True, rcvEvents=True,
userBindings=True, manageConnections=True)
# Register to receive updates for broker:queue objects.
session.bindClass("org.apache.qpid.broker", "queue")
# Register to receive updates for broker:exchange objects.
session.bindClass("org.apache.qpid.broker", "exchange")
# Register to receive updates for broker:binding objects.
session.bindClass("org.apache.qpid.broker", "binding")
# Register to receive updates for broker:connection objects.
session.bindClass("org.apache.qpid.broker", "connection")
# Register to receive updates for broker:session objects.
session.bindClass("org.apache.qpid.broker", "session")
# Register to receive updates for broker:subscription objects.
session.bindClass("org.apache.qpid.broker", "subscription")
# Register to receive updates for broker:subscribe event.
# session.bindEvent("org.apache.qpid.broker", "subscribe")
brokers = []
try:
try:
for host in arguments:
brokers.append(session.addBroker(host, None,
options.sasl_mechanism))
while (True):
sleep(10)
except KeyboardInterrupt:
print
return 0
except Exception, e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
finally:
while len(brokers):
b = brokers.pop()
session.delBroker(b)
if __name__ == '__main__':
sys.exit(main())
<whitelist>
<exchangeWhitelist>
<exchange>qmf.default.topic</exchange>
<exchange>qmf.default.direct</exchange>
<exchange>qpid.management</exchange>
<exchange>amq.direct</exchange>
<exchange></exchange>
</exchangeWhitelist>
<queueWhitelist>
<queue>testqueue</queue>
</queueWhitelist>
</whitelist>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]