Hi all,
I've had cause to write a tool to log connection information so I thought I'd share it.

It's nothing special, it recovers all of the QMF connection objects and logs some of the more useful properties.

It also figures out the sessions associated with each connection and subscriptions associated with each session.

I wanted to work out which connections related to producers in my set up and the only way I could figure out to do that was to find subscriptions related to the connection, If a connection has no subscriptions then assuming it's not in an intermediate state it's quite likely to be a connection from a message producer.

Because you can't work out connections associated with exchanges (I think that's something to do with how AMQP itself works) I believe that the only way one can identify connections from producers is in this somewhat empirical way (unless anyone has a better idea).

Hope this is useful.

Sergey, this little program will also give you information about headers exchange bindings.
Cheers,
Frase
#!/usr/bin/env python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#


# connection-logger is used to provide information about connections made to a 
broker.
#
# In default mode connection-logger lists the connections made to a broker 
along with information about sessions
# such as whether any subscriptions are associated with the session (if a 
session has no subscriptions then it's
# quite likely to be a "producer only" session, so this knowledge is quite 
useful).
#
# In "log queue and binding" mode the information provided is very similar to 
qpid-config -b queues but with
# additional connection related information provided as with default mode.

from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import sys
import socket
import os
import locale
from qmf.console import Session
from time import time, strftime, gmtime, localtime, sleep

usage = "Usage: connection-logger [OPTIONS]"

def Usage():
    print usage

class Config:
    def __init__(self):
        self._logQueues   = False
        self._address     = "localhost"
        self._client_sasl_mechanism = None

config = Config()

class JHelpFormatter(IndentedHelpFormatter):
    """Format usage and description without stripping newlines from usage 
strings"""

    def format_usage(self, usage):
        return usage

    def format_description(self, description):
        return ""

def OptionsAndArguments(argv):
    parser = OptionParser(usage=usage, description="", 
formatter=JHelpFormatter())
    parser.add_option("-q", "--log-queues", action="store_true", help="log 
queue and binding information for consumer connections")
    parser.add_option("-a", "--broker-address", action="store", type="string", 
metavar="<address>", help="broker-addr is in the form: [username/password@] 
hostname | ip-address [:<port>]   ex:  localhost, 10.1.1.7:10000, 
broker-host:10000, guest/guest@localhost")
    parser.add_option("--client-sasl-mechanism", action="store", type="string", 
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, 
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects 
to the destination broker (not for authentication between the source and 
destination brokers - that is specified using the [mechanisms] argument to 'add 
route'). SASL automatically picks the most secure available mechanism - use 
this option to override.")

    opts, encArgs = parser.parse_args(args=argv)

    try:
        encoding = locale.getpreferredencoding()
        args = [a.decode(encoding) for a in encArgs]
    except:
        args = encArgs

    if opts.log_queues:
        config._logQueues = True

    if opts.broker_address: # Note underscore not hyphen used here
        config._address = opts.broker_address

    if opts.client_sasl_mechanism: # Note underscore not hyphen used here
        config._client_sasl_mechanism = opts.client_sasl_mechanism

    return args

class BrokerManager: # The first bit of this class is mostly lifted straight 
from qpid-config
    def __init__(self):
        self.brokerName = None
        self.qmf        = None
        self.broker     = None

    def setBroker(self): # Tweaked addBroker below to use values from Config 
class
        self.qmf = Session()
        self.broker = self.qmf.addBroker(config._address, 10, 
config._client_sasl_mechanism)
        agents = self.qmf.getAgents()
        for a in agents:
            if a.getAgentBank() == '0':
                self.brokerAgent = a

    def disconnect(self):
        if self.broker:
            self.qmf.delBroker(self.broker)

    def findById(self, items, id):
        for item in items:
            if item.getObjectId() == id:
                return item
        return None

    # For every queue list the bindings (pretty much taken from qpid-config 
QueueListRecurse with minor tweaks)
    def logQueueInformation(self, ref):
        exchanges = self.qmf.getObjects(_class="exchange", 
_agent=self.brokerAgent)
        bindings  = self.qmf.getObjects(_class="binding", 
_agent=self.brokerAgent)
        queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
        for queue in queues:
            queueId = queue.getObjectId()
            if (ref == None or ref == queueId):
                print "    Queue '%s'" % (queue.name)
                print "        arguments %s" % (queue.arguments)

                for bind in bindings:
                    if bind.queueRef == queue.getObjectId():
                        ename = "<unknown>"
                        ex    = self.findById (exchanges, bind.exchangeRef)
                        if ex != None:
                            ename = ex.name
                            if ename == "":
                                ename = "''"
                        if bind.arguments: # If there are binding arguments 
then it's a headers exchange
                            print "        bind [%s] => %s %s" % 
(bind.bindingKey, ename, bind.arguments)
                        else:
                            print "        bind [%s] => %s" % (bind.bindingKey, 
ename)

    # Logs audit information about each connection made to the broker
    #
    # Obtains connection, session and subscription objects and iterates in turn 
through these comparing
    # references to find the subscriptions association with sessions and 
sessions associated with
    # connections. Ultimately it then uses logQueueInformation to display the 
queues associated with
    # each subscription.
    def logConnectionInformation(self):
        print "\n\n**** connection-logger: Logging current connection 
information ****"

        connections   = self.qmf.getObjects(_class="connection", 
_agent=self.brokerAgent)
        sessions      = self.qmf.getObjects(_class="session", 
_agent=self.brokerAgent)
        subscriptions = self.qmf.getObjects(_class="subscription", 
_agent=self.brokerAgent)

        for connection in connections:
            print "\nConnection '%s'" % (connection.address)
            print "authIdentity: %s" % (connection.authIdentity)
            print "remoteProcessName: %s" % (connection.remoteProcessName)
            print "federationLink: %s" % (connection.federationLink)
            ctime = connection.getTimestamps()[1]/1000000000
            print "createTimestamp: %s" % strftime("%c", localtime(ctime))

            connectionId = connection.getObjectId()
            for session in sessions: # Iterate through all session objects
                connectionRef = session.connectionRef
                if connectionRef == connectionId:
                    # Only select sessions that are associated with the 
connection under consideration.
                    print "Session: '%s'" % (session.name)
                    subscriptionCount = 0
                    sessionId = session.getObjectId()
                    for subscription in subscriptions: # Iterate through all 
subscription objects
                        sessionRef = subscription.sessionRef
                        if sessionRef == sessionId:
                            # Only select subscriptions that are associated 
with the session under consideration.
                            subscriptionCount = subscriptionCount + 1
                            queueRef = subscription.queueRef
                            if config._logQueues: # Display detailed info about 
queues and bindings if needed
                                self.logQueueInformation(queueRef)

                    if subscriptionCount == 0:
                        print "    ** No Subscriptions for this Session - 
probably a producer only Session **"

def main(argv=None):
    args = OptionsAndArguments(argv)

    bm = BrokerManager()
    try:
        bm.setBroker()
        bm.logConnectionInformation()

    except Exception,e:
        print "Failed: %s - %s" % (e.__class__.__name__, e)
        return 1

    bm.disconnect()
    return 0

if __name__ == "__main__":
        sys.exit(main())

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to