Author: tross
Date: Fri Oct 31 11:56:24 2008
New Revision: 709532

URL: http://svn.apache.org/viewvc?rev=709532&view=rev
Log:
Federation enhancements and bug fixes:

  qmfconsole.py - minor fixes, make sure object-dereference only queries one 
broker
  Bridge.cpp    - Added channel-id to queue name to avoid collisions
  qpid-route    - Added link-map feature for viewing the entire federated 
topology


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/python/commands/qpid-route
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=709532&r1=709531&r2=709532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Fri Oct 31 
11:56:24 2008
@@ -27,6 +27,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
+#include <iostream>
 
 using qpid::framing::FieldTable;
 using qpid::framing::Uuid;
@@ -59,7 +60,9 @@
     link(_link), id(_id), args(_args), mgmtObject(0),
     listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), 
persistenceId(0)
 {
-    queueName += link->getBroker()->getFederationTag();
+    std::stringstream title;
+    title << id << "_" << link->getBroker()->getFederationTag();
+    queueName += title.str();
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0) {
         mgmtObject = new _qmf::Bridge

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=709532&r1=709531&r2=709532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Fri Oct 31 11:56:24 
2008
@@ -29,6 +29,7 @@
     print "Usage:  qpid-route [OPTIONS] link add  <dest-broker> <src-broker>"
     print "        qpid-route [OPTIONS] link del  <dest-broker> <src-broker>"
     print "        qpid-route [OPTIONS] link list [<dest-broker>]"
+    print "        qpid-route [OPTIONS] link map  [<broker>]"
     print
     print "        qpid-route [OPTIONS] route add   <dest-broker> <src-broker> 
<exchange> <routing-key> [tag] [exclude-list]"
     print "        qpid-route [OPTIONS] route del   <dest-broker> <src-broker> 
<exchange> <routing-key>"
@@ -120,6 +121,80 @@
                 print "%-16s%-8d   %c     %-18s%s" % \
                 (link.host, link.port, YN(link.durable), link.state, 
link.lastError)
 
+    def MapLinks(self):
+        qmf = self.qmf
+        print
+        print "Finding Linked Brokers:"
+
+        brokerList = {}
+        brokerList[self.dest.name()] = self.broker
+        print "    %s... Ok" % self.dest
+
+        added = True
+        while added:
+            added = False
+            links = qmf.getObjects(_class="link")
+            for link in links:
+                url = qmfconsole.BrokerURL("%s:%d" % (link.host, link.port))
+                if url.name() not in brokerList:
+                    print "    %s..." % url.name(),
+                    try:
+                        b = qmf.addBroker("%s:%d" % (link.host, link.port))
+                        brokerList[url.name()] = b
+                        added = True
+                        print "Ok"
+                    except Exception, e:
+                        print e
+
+        print
+        print "Dynamic Routes:"
+        bridges = qmf.getObjects(_class="bridge", dynamic=True)
+        fedExchanges = []
+        for bridge in bridges:
+            if bridge.src not in fedExchanges:
+                fedExchanges.append(bridge.src)
+        if len(fedExchanges) == 0:
+            print "  none found"
+        else:
+            print
+
+        for ex in fedExchanges:
+            print "  Exchange %s:" % ex
+            pairs = []
+            for bridge in bridges:
+                if bridge.src == ex:
+                    link = bridge._linkRef_
+                    fromUrl = "%s:%s" % (link.host, link.port)
+                    toUrl = bridge.getBroker().getUrl()
+                    found = False
+                    for pair in pairs:
+                        if pair.matches(fromUrl, toUrl):
+                            found = True
+                    if not found:
+                        pairs.append(RoutePair(fromUrl, toUrl))
+            for pair in pairs:
+                print "    %s" % pair
+            print
+
+        print "Static Routes:"
+        bridges = qmf.getObjects(_class="bridge", dynamic=False)
+        if len(bridges) == 0:
+            print "  none found"
+        else:
+            print
+
+        for bridge in bridges:
+            link = bridge._linkRef_
+            fromUrl = "%s:%s" % (link.host, link.port)
+            toUrl = bridge.getBroker().getUrl()
+            print "  %s(%s) <= %s(%s) key=%s" % (toUrl, bridge.dest, fromUrl, 
bridge.src, bridge.key)
+        print
+
+        for broker in brokerList:
+            if broker != self.dest.name():
+                qmf.delBroker(brokerList[broker])
+
+
     def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, 
dynamic=False):
         self.src = qmfconsole.BrokerURL(srcBroker)
         if self.dest.match(self.src.host, self.src.port):
@@ -240,6 +315,28 @@
                 elif _verbose:
                     print "Ok"
 
+class RoutePair:
+    def __init__(self, fromUrl, toUrl):
+        self.fromUrl = fromUrl
+        self.toUrl = toUrl
+        self.bidir = False
+
+    def __repr__(self):
+        if self.bidir:
+            delimit = "<=>"
+        else:
+            delimit = " =>"
+        return "%s %s %s" % (self.fromUrl, delimit, self.toUrl)
+
+    def matches(self, fromUrl, toUrl):
+        if fromUrl == self.fromUrl and toUrl == self.toUrl:
+            return True
+        if toUrl == self.fromUrl and fromUrl == self.toUrl:
+            self.bidir = True
+            return True
+        return False
+        
+
 def YN(val):
     if val == 1:
         return 'Y'
@@ -290,7 +387,9 @@
                 Usage()
             rm.DelLink (cargs[3])
         elif cmd == "list":
-            rm.ListLinks ()
+            rm.ListLinks()
+        elif cmd == "map":
+            rm.MapLinks()
 
     elif group == "dynamic":
         if cmd == "add":
@@ -330,6 +429,7 @@
                 rm.ClearAllRoutes ()
             else:
                 Usage ()
+
 except Exception,e:
     print "Failed:", e.args[0]
     sys.exit(1)

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=709532&r1=709531&r2=709532&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Fri Oct 31 11:56:24 2008
@@ -511,6 +511,8 @@
       self.packages[pname][(cname, hash)] = _class
     finally:
       self.cv.release()
+      
+    self.seqMgr._release(seq)
     broker._decOutstanding()
     if self.console != None:
       self.console.newClass(kind, classKey)
@@ -759,7 +761,7 @@
     self.unit     = None
     self.min      = None
     self.max      = None
-    self.maxlan   = None
+    self.maxlen   = None
     self.desc     = None
 
     for key, value in map.items():
@@ -916,6 +918,10 @@
       for statistic in schema.getStatistics():
         self._statistics.append((statistic, self._session._decodeValue(codec, 
statistic.type)))
 
+  def getBroker(self):
+    """ Return the broker from which this object was sent """
+    return self._broker
+
   def getObjectId(self):
     """ Return the object identifier for this object """
     return self._objectId
@@ -972,7 +978,7 @@
       if name == property.name:
         return value
       if name == "_" + property.name + "_" and property.type == 10:  # 
Dereference references
-        deref = self._session.getObjects(_objectId=value)
+        deref = self._session.getObjects(_objectId=value, _broker=self._broker)
         if len(deref) != 1:
           return None
         else:
@@ -1090,6 +1096,7 @@
     self.error     = None
     self.brokerId  = None
     self.isConnected = False
+    self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
     self._tryToConnect()
 
   def isConnected(self):
@@ -1126,7 +1133,6 @@
 
   def _tryToConnect(self):
     try:
-      self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
       sock = connect(self.host, self.port)
       if self.ssl:
         sock = ssl(sock)


Reply via email to