Author: tross
Date: Thu Oct 30 21:15:52 2008
New Revision: 709342

URL: http://svn.apache.org/viewvc?rev=709342&view=rev
Log:
Federation bug-fixes:

  1) Locking was added to protect the exchange's vector of bridges.
  2) Bridges are now properly torn down when a link is lost.
  3) Auto-tracing was improperly assigning tags to federation queues.

Also, the federation queue name now uses the broker-id for the destination
broker.  This makes it easier to determine which queues go to which
brokers.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Oct 30 
21:15:52 2008
@@ -59,7 +59,7 @@
     link(_link), id(_id), args(_args), mgmtObject(0),
     listener(l), name(Uuid(true).str()), queueName("bridge_queue_"), 
persistenceId(0)
 {
-    queueName += name;
+    queueName += link->getBroker()->getFederationTag();
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0) {
         mgmtObject = new _qmf::Bridge
@@ -109,17 +109,17 @@
         if (args.i_tag.size()) {
             queueSettings.setString("qpid.trace.id", args.i_tag);
         } else {
-            const string& localTag = link->getBroker()->getFederationTag();
-            if (localTag.size())
-                queueSettings.setString("qpid.trace.id", localTag);
+            const string& peerTag = connState->getFederationPeerTag();
+            if (peerTag.size())
+                queueSettings.setString("qpid.trace.id", peerTag);
         }
 
         if (args.i_excludes.size()) {
             queueSettings.setString("qpid.trace.exclude", args.i_excludes);
         } else {
-            const string& peerTag = connState->getFederationPeerTag();
-            if (peerTag.size())
-                queueSettings.setString("qpid.trace.exclude", peerTag);
+            const string& localTag = link->getBroker()->getFederationTag();
+            if (localTag.size())
+                queueSettings.setString("qpid.trace.exclude", localTag);
         }
 
         bool durable = false;//should this be an arg, or would be use 
srcIsQueue for durable queues?

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Oct 30 
21:15:52 2008
@@ -30,6 +30,7 @@
 using namespace qpid::framing;
 using qpid::framing::Buffer;
 using qpid::framing::FieldTable;
+using qpid::sys::Mutex;
 using qpid::management::ManagementAgent;
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
@@ -180,11 +181,15 @@
     if (!supportsDynamicBinding())
         throw Exception("Exchange type does not support dynamic binding");
 
-    for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
-         iter != bridgeVector.end(); iter++)
-        (*iter)->sendReorigin();
+    {
+        Mutex::ScopedLock l(bridgeLock);
+        for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+             iter != bridgeVector.end(); iter++)
+            (*iter)->sendReorigin();
+
+        bridgeVector.push_back(db);
+    }
 
-    bridgeVector.push_back(db);
     FieldTable args;
     args.setString(qpidFedOp, fedOpReorigin);
     bind(Queue::shared_ptr(), string(), &args);
@@ -192,6 +197,7 @@
 
 void Exchange::removeDynamicBridge(DynamicBridge* db)
 {
+    Mutex::ScopedLock l(bridgeLock);
     for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
          iter != bridgeVector.end(); iter++)
         if (*iter == db) {
@@ -206,6 +212,7 @@
 
 void Exchange::propagateFedOp(const string& routingKey, const string& tags, 
const string& op, const string& origin)
 {
+    Mutex::ScopedLock l(bridgeLock);
     string myOp(op.empty() ? fedOpBind : op);
 
     for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Oct 30 
21:15:52 2008
@@ -165,6 +165,7 @@
     virtual bool supportsDynamicBinding() { return false; }
 
 protected:
+    qpid::sys::Mutex bridgeLock;
     std::vector<DynamicBridge*> bridgeVector;
 
     virtual void handleHelloRequest();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Oct 30 21:15:52 
2008
@@ -139,8 +139,10 @@
     if (state == STATE_OPERATIONAL)
         QPID_LOG (warning, "Inter-broker link disconnected from " << host << 
":" << port);
 
-    for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+        (*i)->cancel();
         created.push_back(*i);
+    }
     active.clear();
 
     if (state != STATE_FAILED)


Reply via email to