Author: tross
Date: Fri Oct 31 11:56:24 2008
New Revision: 709532
URL: http://svn.apache.org/viewvc?rev=709532&view=rev
Log:
Federation enhancements and bug fixes:
qmfconsole.py - minor fixes, make sure object-dereference only queries one
broker
Bridge.cpp - Added channel-id to queue name to avoid collisions
qpid-route - Added link-map feature for viewing the entire federated
topology
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
incubator/qpid/trunk/qpid/python/commands/qpid-route
incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=709532&r1=709531&r2=709532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Fri Oct 31
11:56:24 2008
@@ -27,6 +27,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/framing/Uuid.h"
#include "qpid/log/Statement.h"
+#include <iostream>
using qpid::framing::FieldTable;
using qpid::framing::Uuid;
@@ -59,7 +60,9 @@
link(_link), id(_id), args(_args), mgmtObject(0),
listener(l), name(Uuid(true).str()), queueName("bridge_queue_"),
persistenceId(0)
{
- queueName += link->getBroker()->getFederationTag();
+ std::stringstream title;
+ title << id << "_" << link->getBroker()->getFederationTag();
+ queueName += title.str();
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=709532&r1=709531&r2=709532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Fri Oct 31 11:56:24
2008
@@ -29,6 +29,7 @@
print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
print " qpid-route [OPTIONS] link list [<dest-broker>]"
+ print " qpid-route [OPTIONS] link map [<broker>]"
print
print " qpid-route [OPTIONS] route add <dest-broker> <src-broker>
<exchange> <routing-key> [tag] [exclude-list]"
print " qpid-route [OPTIONS] route del <dest-broker> <src-broker>
<exchange> <routing-key>"
@@ -120,6 +121,80 @@
print "%-16s%-8d %c %-18s%s" % \
(link.host, link.port, YN(link.durable), link.state,
link.lastError)
+ def MapLinks(self):
+ qmf = self.qmf
+ print
+ print "Finding Linked Brokers:"
+
+ brokerList = {}
+ brokerList[self.dest.name()] = self.broker
+ print " %s... Ok" % self.dest
+
+ added = True
+ while added:
+ added = False
+ links = qmf.getObjects(_class="link")
+ for link in links:
+ url = qmfconsole.BrokerURL("%s:%d" % (link.host, link.port))
+ if url.name() not in brokerList:
+ print " %s..." % url.name(),
+ try:
+ b = qmf.addBroker("%s:%d" % (link.host, link.port))
+ brokerList[url.name()] = b
+ added = True
+ print "Ok"
+ except Exception, e:
+ print e
+
+ print
+ print "Dynamic Routes:"
+ bridges = qmf.getObjects(_class="bridge", dynamic=True)
+ fedExchanges = []
+ for bridge in bridges:
+ if bridge.src not in fedExchanges:
+ fedExchanges.append(bridge.src)
+ if len(fedExchanges) == 0:
+ print " none found"
+ else:
+ print
+
+ for ex in fedExchanges:
+ print " Exchange %s:" % ex
+ pairs = []
+ for bridge in bridges:
+ if bridge.src == ex:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ found = False
+ for pair in pairs:
+ if pair.matches(fromUrl, toUrl):
+ found = True
+ if not found:
+ pairs.append(RoutePair(fromUrl, toUrl))
+ for pair in pairs:
+ print " %s" % pair
+ print
+
+ print "Static Routes:"
+ bridges = qmf.getObjects(_class="bridge", dynamic=False)
+ if len(bridges) == 0:
+ print " none found"
+ else:
+ print
+
+ for bridge in bridges:
+ link = bridge._linkRef_
+ fromUrl = "%s:%s" % (link.host, link.port)
+ toUrl = bridge.getBroker().getUrl()
+ print " %s(%s) <= %s(%s) key=%s" % (toUrl, bridge.dest, fromUrl,
bridge.src, bridge.key)
+ print
+
+ for broker in brokerList:
+ if broker != self.dest.name():
+ qmf.delBroker(brokerList[broker])
+
+
def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes,
dynamic=False):
self.src = qmfconsole.BrokerURL(srcBroker)
if self.dest.match(self.src.host, self.src.port):
@@ -240,6 +315,28 @@
elif _verbose:
print "Ok"
+class RoutePair:
+ def __init__(self, fromUrl, toUrl):
+ self.fromUrl = fromUrl
+ self.toUrl = toUrl
+ self.bidir = False
+
+ def __repr__(self):
+ if self.bidir:
+ delimit = "<=>"
+ else:
+ delimit = " =>"
+ return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
+
+ def matches(self, fromUrl, toUrl):
+ if fromUrl == self.fromUrl and toUrl == self.toUrl:
+ return True
+ if toUrl == self.fromUrl and fromUrl == self.toUrl:
+ self.bidir = True
+ return True
+ return False
+
+
def YN(val):
if val == 1:
return 'Y'
@@ -290,7 +387,9 @@
Usage()
rm.DelLink (cargs[3])
elif cmd == "list":
- rm.ListLinks ()
+ rm.ListLinks()
+ elif cmd == "map":
+ rm.MapLinks()
elif group == "dynamic":
if cmd == "add":
@@ -330,6 +429,7 @@
rm.ClearAllRoutes ()
else:
Usage ()
+
except Exception,e:
print "Failed:", e.args[0]
sys.exit(1)
Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=709532&r1=709531&r2=709532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Fri Oct 31 11:56:24 2008
@@ -511,6 +511,8 @@
self.packages[pname][(cname, hash)] = _class
finally:
self.cv.release()
+
+ self.seqMgr._release(seq)
broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
@@ -759,7 +761,7 @@
self.unit = None
self.min = None
self.max = None
- self.maxlan = None
+ self.maxlen = None
self.desc = None
for key, value in map.items():
@@ -916,6 +918,10 @@
for statistic in schema.getStatistics():
self._statistics.append((statistic, self._session._decodeValue(codec,
statistic.type)))
+ def getBroker(self):
+ """ Return the broker from which this object was sent """
+ return self._broker
+
def getObjectId(self):
""" Return the object identifier for this object """
return self._objectId
@@ -972,7 +978,7 @@
if name == property.name:
return value
if name == "_" + property.name + "_" and property.type == 10: #
Dereference references
- deref = self._session.getObjects(_objectId=value)
+ deref = self._session.getObjects(_objectId=value, _broker=self._broker)
if len(deref) != 1:
return None
else:
@@ -1090,6 +1096,7 @@
self.error = None
self.brokerId = None
self.isConnected = False
+ self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
self._tryToConnect()
def isConnected(self):
@@ -1126,7 +1133,6 @@
def _tryToConnect(self):
try:
- self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
sock = connect(self.host, self.port)
if self.ssl:
sock = ssl(sock)