Hi all,
I've had cause to write a tool to log connection information so I
thought I'd share it.
It's nothing special, it recovers all of the QMF connection objects and
logs some of the more useful properties.
It also figures out the sessions associated with each connection and
subscriptions associated with each session.
I wanted to work out which connections related to producers in my set up
and the only way I could figure out to do that was to find subscriptions
related to the connection, If a connection has no subscriptions then
assuming it's not in an intermediate state it's quite likely to be a
connection from a message producer.
Because you can't work out connections associated with exchanges (I
think that's something to do with how AMQP itself works) I believe that
the only way one can identify connections from producers is in this
somewhat empirical way (unless anyone has a better idea).
Hope this is useful.
Sergey, this little program will also give you information about headers
exchange bindings.
Cheers,
Frase
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# connection-logger is used to provide information about connections made to a
broker.
#
# In default mode connection-logger lists the connections made to a broker
along with information about sessions
# such as whether any subscriptions are associated with the session (if a
session has no subscriptions then it's
# quite likely to be a "producer only" session, so this knowledge is quite
useful).
#
# In "log queue and binding" mode the information provided is very similar to
qpid-config -b queues but with
# additional connection related information provided as with default mode.
from optparse import OptionParser, OptionGroup, IndentedHelpFormatter
import sys
import socket
import os
import locale
from qmf.console import Session
from time import time, strftime, gmtime, localtime, sleep
usage = "Usage: connection-logger [OPTIONS]"
def Usage():
print usage
class Config:
def __init__(self):
self._logQueues = False
self._address = "localhost"
self._client_sasl_mechanism = None
config = Config()
class JHelpFormatter(IndentedHelpFormatter):
"""Format usage and description without stripping newlines from usage
strings"""
def format_usage(self, usage):
return usage
def format_description(self, description):
return ""
def OptionsAndArguments(argv):
parser = OptionParser(usage=usage, description="",
formatter=JHelpFormatter())
parser.add_option("-q", "--log-queues", action="store_true", help="log
queue and binding information for consumer connections")
parser.add_option("-a", "--broker-address", action="store", type="string",
metavar="<address>", help="broker-addr is in the form: [username/password@]
hostname | ip-address [:<port>] ex: localhost, 10.1.1.7:10000,
broker-host:10000, guest/guest@localhost")
parser.add_option("--client-sasl-mechanism", action="store", type="string",
metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL,
ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects
to the destination broker (not for authentication between the source and
destination brokers - that is specified using the [mechanisms] argument to 'add
route'). SASL automatically picks the most secure available mechanism - use
this option to override.")
opts, encArgs = parser.parse_args(args=argv)
try:
encoding = locale.getpreferredencoding()
args = [a.decode(encoding) for a in encArgs]
except:
args = encArgs
if opts.log_queues:
config._logQueues = True
if opts.broker_address: # Note underscore not hyphen used here
config._address = opts.broker_address
if opts.client_sasl_mechanism: # Note underscore not hyphen used here
config._client_sasl_mechanism = opts.client_sasl_mechanism
return args
class BrokerManager: # The first bit of this class is mostly lifted straight
from qpid-config
def __init__(self):
self.brokerName = None
self.qmf = None
self.broker = None
def setBroker(self): # Tweaked addBroker below to use values from Config
class
self.qmf = Session()
self.broker = self.qmf.addBroker(config._address, 10,
config._client_sasl_mechanism)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == '0':
self.brokerAgent = a
def disconnect(self):
if self.broker:
self.qmf.delBroker(self.broker)
def findById(self, items, id):
for item in items:
if item.getObjectId() == id:
return item
return None
# For every queue list the bindings (pretty much taken from qpid-config
QueueListRecurse with minor tweaks)
def logQueueInformation(self, ref):
exchanges = self.qmf.getObjects(_class="exchange",
_agent=self.brokerAgent)
bindings = self.qmf.getObjects(_class="binding",
_agent=self.brokerAgent)
queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
for queue in queues:
queueId = queue.getObjectId()
if (ref == None or ref == queueId):
print " Queue '%s'" % (queue.name)
print " arguments %s" % (queue.arguments)
for bind in bindings:
if bind.queueRef == queue.getObjectId():
ename = "<unknown>"
ex = self.findById (exchanges, bind.exchangeRef)
if ex != None:
ename = ex.name
if ename == "":
ename = "''"
if bind.arguments: # If there are binding arguments
then it's a headers exchange
print " bind [%s] => %s %s" %
(bind.bindingKey, ename, bind.arguments)
else:
print " bind [%s] => %s" % (bind.bindingKey,
ename)
# Logs audit information about each connection made to the broker
#
# Obtains connection, session and subscription objects and iterates in turn
through these comparing
# references to find the subscriptions association with sessions and
sessions associated with
# connections. Ultimately it then uses logQueueInformation to display the
queues associated with
# each subscription.
def logConnectionInformation(self):
print "\n\n**** connection-logger: Logging current connection
information ****"
connections = self.qmf.getObjects(_class="connection",
_agent=self.brokerAgent)
sessions = self.qmf.getObjects(_class="session",
_agent=self.brokerAgent)
subscriptions = self.qmf.getObjects(_class="subscription",
_agent=self.brokerAgent)
for connection in connections:
print "\nConnection '%s'" % (connection.address)
print "authIdentity: %s" % (connection.authIdentity)
print "remoteProcessName: %s" % (connection.remoteProcessName)
print "federationLink: %s" % (connection.federationLink)
ctime = connection.getTimestamps()[1]/1000000000
print "createTimestamp: %s" % strftime("%c", localtime(ctime))
connectionId = connection.getObjectId()
for session in sessions: # Iterate through all session objects
connectionRef = session.connectionRef
if connectionRef == connectionId:
# Only select sessions that are associated with the
connection under consideration.
print "Session: '%s'" % (session.name)
subscriptionCount = 0
sessionId = session.getObjectId()
for subscription in subscriptions: # Iterate through all
subscription objects
sessionRef = subscription.sessionRef
if sessionRef == sessionId:
# Only select subscriptions that are associated
with the session under consideration.
subscriptionCount = subscriptionCount + 1
queueRef = subscription.queueRef
if config._logQueues: # Display detailed info about
queues and bindings if needed
self.logQueueInformation(queueRef)
if subscriptionCount == 0:
print " ** No Subscriptions for this Session -
probably a producer only Session **"
def main(argv=None):
args = OptionsAndArguments(argv)
bm = BrokerManager()
try:
bm.setBroker()
bm.logConnectionInformation()
except Exception,e:
print "Failed: %s - %s" % (e.__class__.__name__, e)
return 1
bm.disconnect()
return 0
if __name__ == "__main__":
sys.exit(main())
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]