Howdy Lance,

You mention qpid-tool which uses QMF so I'm assuming that you are talking about the C++ broker. The Java broker uses JMX and I'm not so familiar with how that does instrumentation so the following is only really applicable to the C++ broker.


Unfortunately QMF can be a bit non-trivial, it's not helped by the fact that there are two variants - QMF1 which is a binary protocol and accompanying API and QMF2 which is built on top of Map messages.

The python tools bundled with qpid used to use QMF1, though I *think* that as of qpid 0.18 they've started to use QMF2 (trying 0.18 qpid-config on a 0.8 broker gets a method failed error which suggests it's using QMF2 create method - though I've not actually looked at the 0.18 source).

If you are writing your tools in python there's a whole lot more stuff available for QMF.

If you are writing in Java a year back I wrote a complete Java implementation of the QMF2 API:
https://cwiki.apache.org/qpid/qmfv2-api-proposal.html

based on the protocol:
https://cwiki.apache.org/qpid/qmf-map-message-protocol.html

The link to my code is here:
https://issues.apache.org/jira/browse/QPID-3675


Slightly frustratingly there hasn't been much feedback on this - I'd really like to see it make it into the main code base - I put a huge amount of effort into it. The code in that Jira link also contains tests/tools that illustrate how to use QMF2 in Java in some non-trivial scenarios.


As it happens in a few days time I'm about to release a major uplift to that, which will include a QMF2 REST API and most significantly a complete HTML5 based web UI that will enable inspection of all QMF properties and also queue/exchange/binding adding/deletion so that may be just the sort of thing you're interested in - it's pretty close, I'm just doing some last minute de-snagging trying to get it looking nice on small real-estate mobile browsers.


To more directly answer your question "However, I’m having a hard time making that same mapping between the connection stuff and a sender to an exchange. " yeah there's some nuance there :-) the main thing is to think in terms of UML associations, so for example a binding object has queueRef and exchangeRef properties which hold ObjectIds that enable the associated queue and exchange to be indexed (binding behaves like a UML association class really) what this means though if you want to work out how a queue links to an exchange you have to do it from the perspective of the binding.

Similarly it's quite involved to find the subscriptions associated with a queue because the navigation goes "the wrong way" that is to say subscription has a queueRef property.

What I've found is that in general the best (and by far in a way most efficient) way to do non-trivial QMF stuff is to to the getObject() stuff once for each of the objects that you care about then dereference those objects into Maps indexed by objectId - if you do that then you can index everything you want easily and cheaply.

If you look at the code for things like qpid-config it's pretty easy to get into the trap of doing getObjects() calls on inner loops, which is heinously inefficient :-) because each getObjects() call does a messaging request/response under the hood.

To get you up and running and give a bit of insight (I hope!!) I've attached a couple of python programs I wrote last year (these use QMF1)

connection-audit checks when connections are made and looks up the queues being subscribed to against a whitelist, basically if the queue being subscribed to is not a queue that's in the whitelist it generates a log message.

connection-audit-dom is the same but uses DOM based XML parsing (I needed to do that when I had to use it against an oldish python version)

connection-logger does a lot of what I think you care about, basically it logs all connections to a broker and provides useful info on them a la qpid-config -b queues.


connection-logger-orig was my original version of this which was somewhat erm "rushed out" :-) at face value it looks OK for a few queues etc. but if you ramp up a load of connections - well try and add a couple of hundred queues and you'll see what I meant by heinously inefficient above :-D.

connection-logger and connection-logger-orig are functionally equivalent so it's worth looking through both to give a bit of insight into the right way and wrong way to go about things.

I'm not a python programmer by any stretch of the imagination, but I think that they are at least fair examples.

Keep an eye out for my QMF2/UI update over the next few days but hopefully this response has given you a decent leg-up.

Best regards,
Frase



On 11/01/13 02:24, Lance D. wrote:
Hello all.


I’m in the process of building QPID inspection tools for a project I’m
working on.  Now, I’m looking at the results of the qpid-tool.  I see that
the connection schema has a bunch of useful info (like remote pid, process,
etc).  I’ve also found ways to link that info up with the queues to figure
out who’s subscribed to what.  However, I’m having a hard time making that
same mapping between the connection stuff and a sender to an exchange.



My question is, is there a way to do that mapping (either with the existing
tools, or by tapping into the qmf API?



Thanks,
-Lance


#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#


# connection-logger is used to provide information about connections made to a 
broker.
#
# In default mode connection-logger lists the connections made to a broker 
along with information about sessions
# such as whether any subscriptions are associated with the session (if a 
session has no subscriptions then it's
# quite likely to be a "producer only" session, so this knowledge is quite 
useful).
#
# In "log queue and binding" mode the information provided is very similar to 
qpid-config -b queues but with
# additional connection related information provided as with default mode.

from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import sys
import socket
import os
import locale
from qmf.console import Session
from time import time, strftime, gmtime, localtime, sleep

usage = "Usage: connection-logger [OPTIONS]"

def Usage():
    print usage

class Config:
    def __init__(self):
        self._logQueues   = False
        self._address     = "localhost"
        self._client_sasl_mechanism = None

config = Config()

class JHelpFormatter(IndentedHelpFormatter):
    """Format usage and description without stripping newlines from usage 
strings"""

    def format_usage(self, usage):
        return usage

    def format_description(self, description):
        return ""

def OptionsAndArguments(argv):
    parser = OptionParser(usage=usage, description="", 
formatter=JHelpFormatter())
    parser.add_option("-q", "--log-queues", action="store_true", help="log 
queue and binding information for consumer connections")
    parser.add_option("-a", "--broker-address", action="store", type="string", 
metavar="<address>", help="broker-addr is in the form: [username/password@] 
hostname | ip-address [:<port>]   ex:  localhost, 10.1.1.7:10000, 
broker-host:10000, guest/guest@localhost")
    parser.add_option("--client-sasl-mechanism", action="store", type="string", 
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, 
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects 
to the destination broker (not for authentication between the source and 
destination brokers - that is specified using the [mechanisms] argument to 'add 
route'). SASL automatically picks the most secure available mechanism - use 
this option to override.")

    opts, encArgs = parser.parse_args(args=argv)

    try:
        encoding = locale.getpreferredencoding()
        args = [a.decode(encoding) for a in encArgs]
    except:
        args = encArgs

    if opts.log_queues:
        config._logQueues = True

    if opts.broker_address: # Note underscore not hyphen used here
        config._address = opts.broker_address

    if opts.client_sasl_mechanism: # Note underscore not hyphen used here
        config._client_sasl_mechanism = opts.client_sasl_mechanism

    return args

class BrokerManager: # The first bit of this class is mostly lifted straight 
from qpid-config
    def __init__(self):
        self.brokerName = None
        self.qmf        = None
        self.broker     = None

        self.connections = None
        self.connectionToSessionAssociations = {}
        self.queueMap = {}
        self.exchangeMap = {}

    def setBroker(self): # Tweaked addBroker below to use values from Config 
class
        self.qmf = Session()
        self.broker = self.qmf.addBroker(config._address, 10, 
config._client_sasl_mechanism)
        agents = self.qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == '0':
                self.brokerAgent = a

    def disconnect(self):
        if self.broker:
            self.qmf.delBroker(self.broker)

    # Retrieve QmfConsoleData Objects from the broker. N.B. these are retrieved 
up-front as each call involves
    # a potentially expensive query to the broker so we don't want to do any of 
this in an inner loop.
    #
    # QMF uses lots of associations between various objects so we also populate 
Maps of the QMF Objects keyed by
    # their ObjectIds and dereference 0..* associations here so we can avoid 
linear searches later, this is pretty
    # important because if we have lots of QMF Objects (Connections/Queues 
etc.) a naive approach using linear
    # searches to dereference these things (a la qpid-config) will completely 
trash the performance.
    def getQmfObjects(self):
        sessionToSubscriptionAssociations = {}
        self.connectionToSessionAssociations.clear()

        # Create the 0..* association between session and subscription
        subscriptions = self.qmf.getObjects(_class="subscription", 
_agent=self.brokerAgent)
        for subscription in subscriptions:
            sessionRef = subscription.sessionRef

            # Create the Subscriptions association array and store it keyed by 
the sessionRef, when we go to store
            # the actual Session we can retrieve the association and store it 
as a property of the Session.
            if (sessionRef not in sessionToSubscriptionAssociations):
                sessionToSubscriptionAssociations[sessionRef] = [subscription]
            else:
                
sessionToSubscriptionAssociations[sessionRef].append(subscription);

        # Create the 0..* association between connection and session
        sessions = self.qmf.getObjects(_class="session", 
_agent=self.brokerAgent)
        for session in sessions:
            sessionId = session.getObjectId()
            connectionRef = session.connectionRef

            subs = []
            if (sessionId in sessionToSubscriptionAssociations):
                subs = sessionToSubscriptionAssociations[sessionId]
            session._subscriptions = subs # Store the association as a property 
of session for easy retrieval

            # Create the Sessions association array and store it keyed by the 
connectionRef, when we go to store
            # the actual Connection we can retrieve the association and store 
it as a property of the Connection.
            if (connectionRef not in self.connectionToSessionAssociations):
                self.connectionToSessionAssociations[connectionRef] = [session]
            else:
                
self.connectionToSessionAssociations[connectionRef].append(session);

        self.connections = self.qmf.getObjects(_class="connection", 
_agent=self.brokerAgent)

        # Only do the following if the -q option has been selected
        if config._logQueues:
            queueToBindingAssociations = {}
            self.queueMap.clear()
            self.exchangeMap.clear()

            # Create the 0..* association between queue and bindings
            bindings = self.qmf.getObjects(_class="binding", 
_agent=self.brokerAgent)
            for binding in bindings:
                queueRef = binding.queueRef

                # Create the Bindings association array and store it keyed by 
the queueRef, when we go to store
                # the actual Queue we can retrieve the association and store it 
as a property of the Queue.
                if (queueRef not in queueToBindingAssociations):
                    queueToBindingAssociations[queueRef] = [binding]
                else:
                    queueToBindingAssociations[queueRef].append(binding);

            queues = self.qmf.getObjects(_class="queue", 
_agent=self.brokerAgent)
            for queue in queues:
                queueId = queue.getObjectId()
                bindings = []
                if (queueId in queueToBindingAssociations):
                    bindings = queueToBindingAssociations[queueId]
                queue._bindings = bindings # Store the association as a 
property of queue for easy retrieval
                self.queueMap[queueId] = queue

            exchanges = self.qmf.getObjects(_class="exchange", 
_agent=self.brokerAgent)
            for exchange in exchanges:
                self.exchangeMap[exchange.getObjectId()] = exchange


    # For every queue list the bindings
    def logQueueInformation(self, ref):
        queue = self.queueMap[ref] # Look up queue by its ObjectId
        print "    Queue '%s'" % (queue.name)
        print "        arguments %s" % (queue.arguments)
        for bind in queue._bindings: # Use the 0..* association with bindings 
we populated in getQmfObjects()
            ename = "<unknown>"
            ex = self.exchangeMap[bind.exchangeRef] # Look up exchange by its 
ObjectId
            if ex != None:
                ename = ex.name
                if ename == "":
                    ename = "''"
            if bind.arguments: # If there are binding arguments then it's a 
headers exchange
                print "        bind [%s] => %s %s" % (bind.bindingKey, ename, 
bind.arguments)
            else:
                print "        bind [%s] => %s" % (bind.bindingKey, ename)


    # Logs audit information about each connection made to the broker
    #
    # Obtains connection, session and subscription objects and iterates in turn 
through these comparing
    # references to find the subscriptions association with sessions and 
sessions associated with
    # connections. Ultimately it then uses logQueueInformation to display the 
queues associated with
    # each subscription.
    def logConnectionInformation(self):
        print "\n\n**** connection-logger: Logging current connection 
information ****"

        for connection in self.connections:
            print "\nConnection '%s'" % (connection.address)
            print "authIdentity: %s" % (connection.authIdentity)
            print "remoteProcessName: %s" % (connection.remoteProcessName)
            print "federationLink: %s" % (connection.federationLink)
            ctime = connection.getTimestamps()[1]/1000000000
            print "createTimestamp: %s" % strftime("%c", localtime(ctime))

            connectionId = connection.getObjectId()
            sessions = []
            if (connectionId in self.connectionToSessionAssociations):
                sessions = self.connectionToSessionAssociations[connectionId] # 
Retrieve the association array

            for session in sessions: # Iterate through all session objects 
                print "Session: '%s'" % (session.name)
                subscriptions = session._subscriptions
                for subscription in subscriptions: # Iterate through all 
subscription objects
                    queueRef = subscription.queueRef
                    if config._logQueues: # Display detailed info about queues 
and bindings if needed
                        self.logQueueInformation(queueRef)

                if (len(subscriptions) == 0):
                    print "    ** No Subscriptions for this Session - probably 
a producer only Session **"


def main(argv=None):
    args = OptionsAndArguments(argv)

    bm = BrokerManager()
    try:
        bm.setBroker()
        bm.getQmfObjects()
        bm.logConnectionInformation()

    except Exception,e:
        print "Failed: %s - %s" % (e.__class__.__name__, e)
        return 1

    bm.disconnect()
    return 0

if __name__ == "__main__":
        sys.exit(main())
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#


# connection-logger is used to provide information about connections made to a 
broker.
#
# In default mode connection-logger lists the connections made to a broker 
along with information about sessions
# such as whether any subscriptions are associated with the session (if a 
session has no subscriptions then it's
# quite likely to be a "producer only" session, so this knowledge is quite 
useful).
#
# In "log queue and binding" mode the information provided is very similar to 
qpid-config -b queues but with
# additional connection related information provided as with default mode.

from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import sys
import socket
import os
import locale
from qmf.console import Session
from time import time, strftime, gmtime, localtime, sleep

usage = "Usage: connection-logger [OPTIONS]"

def Usage():
    print usage

class Config:
    def __init__(self):
        self._logQueues   = False
        self._address     = "localhost"
        self._client_sasl_mechanism = None

config = Config()

class JHelpFormatter(IndentedHelpFormatter):
    """Format usage and description without stripping newlines from usage 
strings"""

    def format_usage(self, usage):
        return usage

    def format_description(self, description):
        return ""

def OptionsAndArguments(argv):
    parser = OptionParser(usage=usage, description="", 
formatter=JHelpFormatter())
    parser.add_option("-q", "--log-queues", action="store_true", help="log 
queue and binding information for consumer connections")
    parser.add_option("-a", "--broker-address", action="store", type="string", 
metavar="<address>", help="broker-addr is in the form: [username/password@] 
hostname | ip-address [:<port>]   ex:  localhost, 10.1.1.7:10000, 
broker-host:10000, guest/guest@localhost")
    parser.add_option("--client-sasl-mechanism", action="store", type="string", 
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, 
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects 
to the destination broker (not for authentication between the source and 
destination brokers - that is specified using the [mechanisms] argument to 'add 
route'). SASL automatically picks the most secure available mechanism - use 
this option to override.")

    opts, encArgs = parser.parse_args(args=argv)

    try:
        encoding = locale.getpreferredencoding()
        args = [a.decode(encoding) for a in encArgs]
    except:
        args = encArgs

    if opts.log_queues:
        config._logQueues = True

    if opts.broker_address: # Note underscore not hyphen used here
        config._address = opts.broker_address

    if opts.client_sasl_mechanism: # Note underscore not hyphen used here
        config._client_sasl_mechanism = opts.client_sasl_mechanism

    return args

class BrokerManager: # The first bit of this class is mostly lifted straight 
from qpid-config
    def __init__(self):
        self.brokerName = None
        self.qmf        = None
        self.broker     = None

    def setBroker(self): # Tweaked addBroker below to use values from Config 
class
        self.qmf = Session()
        self.broker = self.qmf.addBroker(config._address, 10, 
config._client_sasl_mechanism)
        agents = self.qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == '0':
                self.brokerAgent = a

    def disconnect(self):
        if self.broker:
            self.qmf.delBroker(self.broker)

    def findById(self, items, id):
        for item in items:
            if item.getObjectId() == id:
                return item
        return None

    # For every queue list the bindings (pretty much taken from qpid-config 
QueueListRecurse with minor tweaks)
    def logQueueInformation(self, ref):
        exchanges = self.qmf.getObjects(_class="exchange", 
_agent=self.brokerAgent)
        bindings  = self.qmf.getObjects(_class="binding", 
_agent=self.brokerAgent)
        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
        for queue in queues:
            queueId = queue.getObjectId()
            if (ref == None or ref == queueId):
                print "    Queue '%s'" % (queue.name)
                print "        arguments %s" % (queue.arguments)

                for bind in bindings:
                    if bind.queueRef == queue.getObjectId():
                        ename = "<unknown>"
                        ex    = self.findById (exchanges, bind.exchangeRef)
                        if ex != None:
                            ename = ex.name
                            if ename == "":
                                ename = "''"
                        if bind.arguments: # If there are binding arguments 
then it's a headers exchange
                            print "        bind [%s] => %s %s" % 
(bind.bindingKey, ename, bind.arguments)
                        else:
                            print "        bind [%s] => %s" % (bind.bindingKey, 
ename)

    # Logs audit information about each connection made to the broker
    #
    # Obtains connection, session and subscription objects and iterates in turn 
through these comparing
    # references to find the subscriptions association with sessions and 
sessions associated with
    # connections. Ultimately it then uses logQueueInformation to display the 
queues associated with
    # each subscription.
    def logConnectionInformation(self):
        print "\n\n**** connection-logger: Logging current connection 
information ****"

        connections   = self.qmf.getObjects(_class="connection", 
_agent=self.brokerAgent)
        sessions      = self.qmf.getObjects(_class="session", 
_agent=self.brokerAgent)
        subscriptions = self.qmf.getObjects(_class="subscription", 
_agent=self.brokerAgent)

        for connection in connections:
            print "\nConnection '%s'" % (connection.address)
            print "authIdentity: %s" % (connection.authIdentity)
            print "remoteProcessName: %s" % (connection.remoteProcessName)
            print "federationLink: %s" % (connection.federationLink)
            ctime = connection.getTimestamps()[1]/1000000000
            print "createTimestamp: %s" % strftime("%c", localtime(ctime))

            connectionId = connection.getObjectId()
            for session in sessions: # Iterate through all session objects
                connectionRef = session.connectionRef
                if connectionRef == connectionId:
                    # Only select sessions that are associated with the 
connection under consideration.
                    print "Session: '%s'" % (session.name)
                    subscriptionCount = 0
                    sessionId = session.getObjectId()
                    for subscription in subscriptions: # Iterate through all 
subscription objects
                        sessionRef = subscription.sessionRef
                        if sessionRef == sessionId:
                            # Only select subscriptions that are associated 
with the session under consideration.
                            subscriptionCount = subscriptionCount + 1
                            queueRef = subscription.queueRef
                            if config._logQueues: # Display detailed info about 
queues and bindings if needed
                                self.logQueueInformation(queueRef)

                    if subscriptionCount == 0:
                        print "    ** No Subscriptions for this Session - 
probably a producer only Session **"

def main(argv=None):
    args = OptionsAndArguments(argv)

    bm = BrokerManager()
    try:
        bm.setBroker()
        bm.logConnectionInformation()

    except Exception,e:
        print "Failed: %s - %s" % (e.__class__.__name__, e)
        return 1

    bm.disconnect()
    return 0

if __name__ == "__main__":
        sys.exit(main())
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# This application audits connections to one or more Qpid message brokers.
# Exchange and Queue names are checked against a whitelist and if no match is 
found an alert is generated

import os
import os.path
import optparse
from optparse import IndentedHelpFormatter
import sys
import socket
from time import time, strftime, gmtime, localtime, sleep
from qmf.console import Console, Session
from xml.etree.ElementTree import ElementTree

class AuditConsole(Console):
    # AuditConsole Constructor
    def __init__(self, whitelistFile):
        self.whitelistFile = whitelistFile
        self.whitelistLastModified = None
        # Declare dictionary to map queue name to queue objects
        self.queueNameMap = {}
        # Declare dictionary to map queue name to subscription objects
        self.queueNameSubscriptionMap = {}
        # Declare dictionaries to map object-ids to objects
        self.queueMap = {}
        self.exchangeMap = {}
        self.bindingMap = {}
        self.connectionMap = {}
        self.sessionMap = {}
        # Declare the sets to be used as the whitelists
        self.exchangeWhitelist = set()
        self.queueWhitelist = set()

    # Overridden method from Console
    # Handle property updates. This method registers or deletes the relevant 
objects in the relevant Map
    # These objects are used by the validateSubscription() in order to find out 
information about the
    # bindings and connections associated with a given queue in order to work 
out if it needs to be tested
    # against the whitelist. For example we don't validate queues bound to the 
management exchanges as
    # this would generate spurious results as most of these queues are 
temporary queues.
    def objectProps(self, broker, record):
        classKey = record.getClassKey()
        oid = record.getObjectId()
        deleted = record.getTimestamps()[2] > 0

        # Note that queue objects are keyed by both queue name and Object ID
        if classKey.getClassName() == "queue":
            queueName = record.name
            if oid not in self.queueMap and not deleted:
                self.queueMap[oid] = record

            if queueName not in self.queueNameMap and not deleted:
                self.queueNameMap[queueName] = record

            # If the queue object has been deleted remove it from the map
            if deleted:
                if queueName in self.queueNameMap:
                    self.queueNameMap.pop(queueName)
                if oid in self.queueMap:
                    self.queueMap.pop(oid)

        if classKey.getClassName() == "exchange":
            if oid not in self.exchangeMap and not deleted:
                self.exchangeMap[oid] = record

            # If the exchange object has been deleted remove it from the map
            if deleted and oid in self.exchangeMap:
                self.exchangeMap.pop(oid)

        if classKey.getClassName() == "binding":
            if oid not in self.bindingMap and not deleted:
                self.bindingMap[oid] = record

            # If the binding object has been deleted remove it from the map
            if deleted and oid in self.bindingMap:
                self.bindingMap.pop(oid)

        if classKey.getClassName() == "connection":
            if oid not in self.connectionMap and not deleted:
                # Re-read the whitelist when a new connection occurs
                self.readWhitelist()
                self.connectionMap[oid] = record

            # If the connection object has been deleted remove it from the map
            if deleted and oid in self.connectionMap:
                self.connectionMap.pop(oid)

        if classKey.getClassName() == "session":
            if oid not in self.sessionMap and not deleted:
                self.sessionMap[oid] = record

            # If the session object has been deleted remove it from the map
            if deleted and oid in self.sessionMap:
                self.sessionMap.pop(oid)

        # Note that subscription objects are keyed by queue name NOT by Object 
ID
        if classKey.getClassName() == "subscription":
            # Find the queue associated with the subscription by looking its 
reference up in queueMap
            queue = self.queueMap.get(record.queueRef)
            if queue != None:
                queueName = queue.name
                if queueName not in self.queueNameSubscriptionMap and not 
deleted:
                    self.queueNameSubscriptionMap[queueName] = record
                    self.validateSubscription(queueName)

                # If the subscription object has been deleted remove it from 
the map
                if deleted and queueName in self.queueNameSubscriptionMap:
                    self.queueNameSubscriptionMap.pop(queueName)

    # This method first checks if the whitelist file exists, if not it clears 
the sets used as whitelists
    # so that no whitelisting is applied. If the whitelist file does exist it 
is parsed by an ElementTree
    # we look for all exchange and queue elements and populate the respective 
whitelist sets with their
    # contents. Note that we check the whitelist file update time to avoid 
reading it if it hasn't been changed
    def readWhitelist(self):
        if os.path.exists(self.whitelistFile):
            mtime = os.path.getmtime(self.whitelistFile)
            if mtime != self.whitelistLastModified:
                self.whitelistLastModified = mtime
                self.exchangeWhitelist.clear()
                self.queueWhitelist.clear()
                tree = ElementTree()
                try:
                    tree.parse(self.whitelistFile)
                    exchanges = tree.findall("exchangeWhitelist/exchange")
                    for i in exchanges:
                        exchange = i.text
                        if exchange == None:
                            self.exchangeWhitelist.add("")
                        else:
                            self.exchangeWhitelist.add(exchange)
                    queues = tree.findall("queueWhitelist/queue")
                    for i in queues:
                        queue = i.text
                        self.queueWhitelist.add(queue)
                except Exception, e: # Failed to parse correctly
                    print strftime("%c", localtime(time())), "WARN 
connection-audit:readWhitelist failed: %s - %s" % (e.__class__.__name__, e)
        else: # If whitelist file doesn't exist log a warning and clear the 
whitelists
            print strftime("%c", localtime(time())), "WARN 
connection-audit:readWhitelist %s doesn't exist" % self.whitelistFile
            sys.stdout.flush()
            self.exchangeWhitelist.clear()
            self.queueWhitelist.clear()

    # Given a queue name this method looks up the associated subscription, 
session and connection
    # objects and returns the connection
    def findConnection(self, queueName):
        subscription = self.queueNameSubscriptionMap.get(queueName)
        if subscription != None:
            session = self.sessionMap.get(subscription.sessionRef)
            if session != None:
                connection = self.connectionMap.get(session.connectionRef)
                if connection != None:
                    return connection

    # This method finds bindings that match a given queue name, if one is found 
the associated exchange is
    # recovered and then a call to validateQueue() is made
    def validateSubscription(self, queueName):
        for binding in self.bindingMap.values():
            queue = self.queueMap.get(binding.queueRef)
            if queue != None and queue.name == queueName:
                exchange = self.exchangeMap.get(binding.exchangeRef)
                if exchange != None:
                    self.validateQueue(queueName, exchange, binding)

    # This method validates the specified queue by comparing the queue name and 
associated exchange name
    # against the whitelist sets. If the exchange name or the queue name is in 
the whitelist then we do
    # nothing, but if not we generate an alert message.
    def validateQueue(self, queueName, exchange, binding):
        exchangeName = exchange.name
        if exchangeName in self.exchangeWhitelist:
            return

        if queueName in self.queueWhitelist:
            return

        connection = self.findConnection(queueName)
        address = connection.address
        ctime = connection.getTimestamps()[1]/1000000000

        if exchangeName == "":
            exchangeName = "''"
        if binding.arguments:
            print strftime("%c", localtime(time())), "ALERT 
connection-audit:validateQueue validation failed for queue: %s with binding[%s] 
=> %s %s from address: %s with connection timestamp" % (queueName, 
binding.bindingKey, exchangeName, binding.arguments, address), strftime("%c", 
localtime(ctime))
        else:
            print strftime("%c", localtime(time())), "ALERT 
connection-audit:validateQueue validation failed for queue: %s with binding[%s] 
=> %s from address: %s with connection timestamp" % (queueName, 
binding.bindingKey, exchangeName, address), strftime("%c", localtime(ctime))
        sys.stdout.flush()

    # Overridden method from Console
#    def event(self, broker, event):
#        if event.classKey.getClassName() == "subscribe":

    # Overridden method from Console
    def brokerConnected(self, broker):
        # Log brokerConnected event
        print strftime("%c", localtime(time())), "NOTIC 
connection-audit:brokerConnected broker=%s" % broker.getUrl()
        sys.stdout.flush()

    # Overridden method from Console
    def brokerConnectionFailed(self, broker):
        # Log brokerConnectionFailed event
        print strftime("%c", localtime(time())), "NOTIC 
connection-audit:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), 
str(broker.conn_exc))
        sys.stdout.flush()

    # Overridden method from Console
    def brokerDisconnected(self, broker):
        # Clear state as it'll get resent when the broker reconnects
        self.queueNameMap.clear()
        self.queueNameSubscriptionMap.clear()
        self.queueMap.clear()
        self.exchangeMap.clear()
        self.bindingMap.clear()
        self.connectionMap.clear()
        self.sessionMap.clear()

        # Log brokerDisconnected event
        print strftime("%c", localtime(time())), "NOTIC 
connection-audit:brokerDisconnected broker=%s" % broker.getUrl()
        sys.stdout.flush()


class JHelpFormatter(IndentedHelpFormatter):
    """Format usage and description without stripping newlines from usage 
strings
    """

    def format_usage(self, usage):
        return usage

    def format_description(self, description):
        if description:
            return description + "\n"
        else:
            return ""

_usage = "%prog [options] [broker-addr]..."

_description = \
"""
Audits connections to one or more Qpid message brokers.
Exchange and Queue names are checked against a whitelist and if no match is 
found an alert is generated

If no broker-addr is supplied, %prog connects to 'localhost:5672'.

[broker-addr] syntax:

          [username/password@] hostname
          ip-address [:<port>]

Examples:

$ %prog localhost:5672
$ %prog 10.1.1.7:10000
$ %prog guest/guest@broker-host:10000
"""

def main(argv=None):
    p = optparse.OptionParser(usage=_usage, description=_description, 
formatter=JHelpFormatter())
    p.add_option("--sasl-mechanism", action="store", type="string", 
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, 
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the 
most secure available mechanism - use this option to override.")
    p.add_option("--whitelist", action="store", type="string", 
default="./whitelist.xml", metavar="<whitelist XML document>", help="The fully 
qualified name of the whitelist XML file, default is ./whitelist.xml")

    options, arguments = p.parse_args(args=argv)
    if len(arguments) == 0:
        arguments.append("localhost")

    console = AuditConsole(options.whitelist)
    session = Session(console, rcvObjects=True, rcvEvents=True, 
userBindings=True, manageConnections=True)

    # Register to receive updates for broker:queue objects.
    session.bindClass("org.apache.qpid.broker", "queue")
    # Register to receive updates for broker:exchange objects.
    session.bindClass("org.apache.qpid.broker", "exchange")
    # Register to receive updates for broker:binding objects.
    session.bindClass("org.apache.qpid.broker", "binding")
    # Register to receive updates for broker:connection objects.
    session.bindClass("org.apache.qpid.broker", "connection")
    # Register to receive updates for broker:session objects.
    session.bindClass("org.apache.qpid.broker", "session")
    # Register to receive updates for broker:subscription objects.
    session.bindClass("org.apache.qpid.broker", "subscription")

    # Register to receive updates for broker:subscribe event.
#    session.bindEvent("org.apache.qpid.broker", "subscribe")

    brokers = []
    try:
        try:
            for host in arguments:
                brokers.append(session.addBroker(host, None, 
options.sasl_mechanism))

            while (True):
                sleep(10)

        except KeyboardInterrupt:
            print
            return 0

        except Exception, e:
            print "Failed: %s - %s" % (e.__class__.__name__, e)
            return 1
    finally:
        while len(brokers):
            b = brokers.pop()
            session.delBroker(b)

if __name__ == '__main__':
  sys.exit(main())
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# This application audits connections to one or more Qpid message brokers.
# Exchange and Queue names are checked against a whitelist and if no match is 
found an alert is generated

import os
import os.path
import optparse
from optparse import IndentedHelpFormatter
import sys
import socket
from time import time, strftime, gmtime, localtime, sleep
from qmf.console import Console, Session
#from xml.etree.ElementTree import ElementTree
from xml.dom import minidom, Node


class AuditConsole(Console):
    # AuditConsole Constructor
    def __init__(self, whitelistFile):
        self.whitelistFile = whitelistFile
        self.whitelistLastModified = None
        # Declare dictionary to map queue name to queue objects
        self.queueNameMap = {}
        # Declare dictionary to map queue name to subscription objects
        self.queueNameSubscriptionMap = {}
        # Declare dictionaries to map object-ids to objects
        self.queueMap = {}
        self.exchangeMap = {}
        self.bindingMap = {}
        self.connectionMap = {}
        self.sessionMap = {}
        # Declare the sets to be used as the whitelists
        self.exchangeWhitelist = set()
        self.queueWhitelist = set()

    # Overridden method from Console
    # Handle property updates. This method registers or deletes the relevant 
objects in the relevant Map
    # These objects are used by the validateSubscription() in order to find out 
information about the
    # bindings and connections associated with a given queue in order to work 
out if it needs to be tested
    # against the whitelist. For example we don't validate queues bound to the 
management exchanges as
    # this would generate spurious results as most of these queues are 
temporary queues.
    def objectProps(self, broker, record):
        classKey = record.getClassKey()
        oid = record.getObjectId()
        deleted = record.getTimestamps()[2] > 0

        # Note that queue objects are keyed by both queue name and Object ID
        if classKey.getClassName() == "queue":
            queueName = record.name
            if oid not in self.queueMap and not deleted:
                self.queueMap[oid] = record

            if queueName not in self.queueNameMap and not deleted:
                self.queueNameMap[queueName] = record

            # If the queue object has been deleted remove it from the map
            if deleted:
                if queueName in self.queueNameMap:
                    self.queueNameMap.pop(queueName)
                if oid in self.queueMap:
                    self.queueMap.pop(oid)

        if classKey.getClassName() == "exchange":
            if oid not in self.exchangeMap and not deleted:
                self.exchangeMap[oid] = record

            # If the exchange object has been deleted remove it from the map
            if deleted and oid in self.exchangeMap:
                self.exchangeMap.pop(oid)

        if classKey.getClassName() == "binding":
            if oid not in self.bindingMap and not deleted:
                self.bindingMap[oid] = record

            # If the binding object has been deleted remove it from the map
            if deleted and oid in self.bindingMap:
                self.bindingMap.pop(oid)

        if classKey.getClassName() == "connection":
            if oid not in self.connectionMap and not deleted:
                # Re-read the whitelist when a new connection occurs
                self.readWhitelist()
                self.connectionMap[oid] = record

            # If the connection object has been deleted remove it from the map
            if deleted and oid in self.connectionMap:
                self.connectionMap.pop(oid)

        if classKey.getClassName() == "session":
            if oid not in self.sessionMap and not deleted:
                self.sessionMap[oid] = record

            # If the session object has been deleted remove it from the map
            if deleted and oid in self.sessionMap:
                self.sessionMap.pop(oid)

        # Note that subscription objects are keyed by queue name NOT by Object 
ID
        if classKey.getClassName() == "subscription":
            # Find the queue associated with the subscription by looking its 
reference up in queueMap
            queue = self.queueMap.get(record.queueRef)
            if queue != None:
                queueName = queue.name
                if queueName not in self.queueNameSubscriptionMap and not 
deleted:
                    self.queueNameSubscriptionMap[queueName] = record
                    self.validateSubscription(queueName)

                # If the subscription object has been deleted remove it from 
the map
                if deleted and queueName in self.queueNameSubscriptionMap:
                    self.queueNameSubscriptionMap.pop(queueName)

    # This method first checks if the whitelist file exists, if not it clears 
the sets used as whitelists
    # so that no whitelisting is applied. If the whitelist file does exist it 
is parsed by a DOM parser
    # we look for all exchange and queue elements and populate the respective 
whitelist sets with their
    # contents. Note that we check the whitelist file update time to avoid 
reading it if it hasn't been changed
    def readWhitelist(self):
        if os.path.exists(self.whitelistFile):
            mtime = os.path.getmtime(self.whitelistFile)
            if mtime != self.whitelistLastModified:
                self.whitelistLastModified = mtime
                self.exchangeWhitelist.clear()
                self.queueWhitelist.clear()
#                tree = ElementTree()
#                try:
#                    tree.parse(self.whitelistFile)
#                    exchanges = tree.findall("exchangeWhitelist/exchange")
#                    for i in exchanges:
#                        exchange = i.text
#                        if exchange == None:
#                            self.exchangeWhitelist.add("")
#                        else:
#                            self.exchangeWhitelist.add(exchange)
#                    queues = tree.findall("queueWhitelist/queue")
#                    for i in queues:
#                        queue = i.text
#                        self.queueWhitelist.add(queue)
                try:
                    doc = minidom.parse(self.whitelistFile)
                    whitelist = doc.documentElement
                    if whitelist.nodeName == "whitelist":
                        children = whitelist.childNodes
                        for child in children:
                            if child.nodeName == "exchangeWhitelist":
                                exchanges = child.childNodes
                                for i in exchanges:
                                    if i.nodeName == "exchange":
                                        if not i.hasChildNodes():
                                            self.exchangeWhitelist.add("")
                                        else:
                                            exchange = i.firstChild.nodeValue
                                            self.exchangeWhitelist.add(exchange)
                            if child.nodeName == "queueWhitelist":
                                queues = child.childNodes
                                for i in queues:
                                    if i.nodeName == "queue":
                                        if i.hasChildNodes():
                                            queues = i.firstChild.nodeValue
                                            self.queueWhitelist.add(queues)

                except Exception, e: # Failed to parse correctly
                    print strftime("%c", localtime(time())), "WARN 
connection-audit:readWhitelist failed: %s - %s" % (e.__class__.__name__, e)
        else: # If whitelist file doesn't exist log a warning and clear the 
whitelists
            print strftime("%c", localtime(time())), "WARN 
connection-audit:readWhitelist %s doesn't exist" % self.whitelistFile
            sys.stdout.flush()
            self.exchangeWhitelist.clear()
            self.queueWhitelist.clear()

    # Given a queue name this method looks up the associated subscription, 
session and connection
    # objects and returns the connection
    def findConnection(self, queueName):
        subscription = self.queueNameSubscriptionMap.get(queueName)
        if subscription != None:
            session = self.sessionMap.get(subscription.sessionRef)
            if session != None:
                connection = self.connectionMap.get(session.connectionRef)
                if connection != None:
                    return connection

    # This method finds bindings that match a given queue name, if one is found 
the associated exchange is
    # recovered and then a call to validateQueue() is made
    def validateSubscription(self, queueName):
        for binding in self.bindingMap.values():
            queue = self.queueMap.get(binding.queueRef)
            if queue != None and queue.name == queueName:
                exchange = self.exchangeMap.get(binding.exchangeRef)
                if exchange != None:
                    self.validateQueue(queueName, exchange, binding)

    # This method validates the specified queue by comparing the queue name and 
associated exchange name
    # against the whitelist sets. If the exchange name or the queue name is in 
the whitelist then we do
    # nothing, but if not we generate an alert message.
    def validateQueue(self, queueName, exchange, binding):
        exchangeName = exchange.name
        if exchangeName in self.exchangeWhitelist:
            return

        if queueName in self.queueWhitelist:
            return

        connection = self.findConnection(queueName)
        address = connection.address
        ctime = connection.getTimestamps()[1]/1000000000

        if exchangeName == "":
            exchangeName = "''"
        if binding.arguments:
            print strftime("%c", localtime(time())), "ALERT 
connection-audit:validateQueue validation failed for queue: %s with binding[%s] 
=> %s %s from address: %s with connection timestamp" % (queueName, 
binding.bindingKey, exchangeName, binding.arguments, address), strftime("%c", 
localtime(ctime))
        else:
            print strftime("%c", localtime(time())), "ALERT 
connection-audit:validateQueue validation failed for queue: %s with binding[%s] 
=> %s from address: %s with connection timestamp" % (queueName, 
binding.bindingKey, exchangeName, address), strftime("%c", localtime(ctime))
        sys.stdout.flush()

    # Overridden method from Console
#    def event(self, broker, event):
#        if event.classKey.getClassName() == "subscribe":

    # Overridden method from Console
    def brokerConnected(self, broker):
        # Log brokerConnected event
        print strftime("%c", localtime(time())), "NOTIC 
connection-audit:brokerConnected broker=%s" % broker.getUrl()
        sys.stdout.flush()

    # Overridden method from Console
    def brokerConnectionFailed(self, broker):
        # Log brokerConnectionFailed event
        print strftime("%c", localtime(time())), "NOTIC 
connection-audit:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), 
str(broker.conn_exc))
        sys.stdout.flush()

    # Overridden method from Console
    def brokerDisconnected(self, broker):
        # Clear state as it'll get resent when the broker reconnects
        self.queueNameMap.clear()
        self.queueNameSubscriptionMap.clear()
        self.queueMap.clear()
        self.exchangeMap.clear()
        self.bindingMap.clear()
        self.connectionMap.clear()
        self.sessionMap.clear()

        # Log brokerDisconnected event
        print strftime("%c", localtime(time())), "NOTIC 
connection-audit:brokerDisconnected broker=%s" % broker.getUrl()
        sys.stdout.flush()


class JHelpFormatter(IndentedHelpFormatter):
    """Format usage and description without stripping newlines from usage 
strings
    """

    def format_usage(self, usage):
        return usage

    def format_description(self, description):
        if description:
            return description + "\n"
        else:
            return ""

_usage = "%prog [options] [broker-addr]..."

_description = \
"""
Audits connections to one or more Qpid message brokers.
Exchange and Queue names are checked against a whitelist and if no match is 
found an alert is generated

If no broker-addr is supplied, %prog connects to 'localhost:5672'.

[broker-addr] syntax:

          [username/password@] hostname
          ip-address [:<port>]

Examples:

$ %prog localhost:5672
$ %prog 10.1.1.7:10000
$ %prog guest/guest@broker-host:10000
"""

def main(argv=None):
    p = optparse.OptionParser(usage=_usage, description=_description, 
formatter=JHelpFormatter())
    p.add_option("--sasl-mechanism", action="store", type="string", 
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, 
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the 
most secure available mechanism - use this option to override.")
    p.add_option("--whitelist", action="store", type="string", 
default="./whitelist.xml", metavar="<whitelist XML document>", help="The fully 
qualified name of the whitelist XML file, default is ./whitelist.xml")

    options, arguments = p.parse_args(args=argv)
    if len(arguments) == 0:
        arguments.append("localhost")

    console = AuditConsole(options.whitelist)
    session = Session(console, rcvObjects=True, rcvEvents=True, 
userBindings=True, manageConnections=True)

    # Register to receive updates for broker:queue objects.
    session.bindClass("org.apache.qpid.broker", "queue")
    # Register to receive updates for broker:exchange objects.
    session.bindClass("org.apache.qpid.broker", "exchange")
    # Register to receive updates for broker:binding objects.
    session.bindClass("org.apache.qpid.broker", "binding")
    # Register to receive updates for broker:connection objects.
    session.bindClass("org.apache.qpid.broker", "connection")
    # Register to receive updates for broker:session objects.
    session.bindClass("org.apache.qpid.broker", "session")
    # Register to receive updates for broker:subscription objects.
    session.bindClass("org.apache.qpid.broker", "subscription")

    # Register to receive updates for broker:subscribe event.
#    session.bindEvent("org.apache.qpid.broker", "subscribe")

    brokers = []
    try:
        try:
            for host in arguments:
                brokers.append(session.addBroker(host, None, 
options.sasl_mechanism))

            while (True):
                sleep(10)

        except KeyboardInterrupt:
            print
            return 0

        except Exception, e:
            print "Failed: %s - %s" % (e.__class__.__name__, e)
            return 1
    finally:
        while len(brokers):
            b = brokers.pop()
            session.delBroker(b)

if __name__ == '__main__':
  sys.exit(main())
<whitelist>
    <exchangeWhitelist>
        <exchange>qmf.default.topic</exchange>
        <exchange>qmf.default.direct</exchange>
        <exchange>qpid.management</exchange>
        <exchange>amq.direct</exchange>
        <exchange></exchange>
    </exchangeWhitelist>
    <queueWhitelist>
        <queue>testqueue</queue>
    </queueWhitelist>
</whitelist>

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to