Author: tross
Date: Thu Oct 30 21:15:52 2008
New Revision: 709342
URL: http://svn.apache.org/viewvc?rev=709342&view=rev
Log:
Federation bug-fixes:
1) Locking was added to protect the exchange's vector of bridges.
2) Bridges are now properly torn down when a link is lost.
3) Auto-tracing was improperly assigning tags to federation queues.
Also, the federation queue name now uses the broker-id for the destination
broker. This makes it easier to determine which queues go to which
brokers.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Oct 30
21:15:52 2008
@@ -59,7 +59,7 @@
link(_link), id(_id), args(_args), mgmtObject(0),
listener(l), name(Uuid(true).str()), queueName("bridge_queue_"),
persistenceId(0)
{
- queueName += name;
+ queueName += link->getBroker()->getFederationTag();
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0) {
mgmtObject = new _qmf::Bridge
@@ -109,17 +109,17 @@
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
} else {
- const string& localTag = link->getBroker()->getFederationTag();
- if (localTag.size())
- queueSettings.setString("qpid.trace.id", localTag);
+ const string& peerTag = connState->getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.id", peerTag);
}
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
} else {
- const string& peerTag = connState->getFederationPeerTag();
- if (peerTag.size())
- queueSettings.setString("qpid.trace.exclude", peerTag);
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.exclude", localTag);
}
bool durable = false;//should this be an arg, or would be use
srcIsQueue for durable queues?
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Thu Oct 30
21:15:52 2008
@@ -30,6 +30,7 @@
using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
+using qpid::sys::Mutex;
using qpid::management::ManagementAgent;
using qpid::management::ManagementObject;
using qpid::management::Manageable;
@@ -180,11 +181,15 @@
if (!supportsDynamicBinding())
throw Exception("Exchange type does not support dynamic binding");
- for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
- iter != bridgeVector.end(); iter++)
- (*iter)->sendReorigin();
+ {
+ Mutex::ScopedLock l(bridgeLock);
+ for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
+ iter != bridgeVector.end(); iter++)
+ (*iter)->sendReorigin();
+
+ bridgeVector.push_back(db);
+ }
- bridgeVector.push_back(db);
FieldTable args;
args.setString(qpidFedOp, fedOpReorigin);
bind(Queue::shared_ptr(), string(), &args);
@@ -192,6 +197,7 @@
void Exchange::removeDynamicBridge(DynamicBridge* db)
{
+ Mutex::ScopedLock l(bridgeLock);
for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
iter != bridgeVector.end(); iter++)
if (*iter == db) {
@@ -206,6 +212,7 @@
void Exchange::propagateFedOp(const string& routingKey, const string& tags,
const string& op, const string& origin)
{
+ Mutex::ScopedLock l(bridgeLock);
string myOp(op.empty() ? fedOpBind : op);
for (std::vector<DynamicBridge*>::iterator iter = bridgeVector.begin();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Thu Oct 30
21:15:52 2008
@@ -165,6 +165,7 @@
virtual bool supportsDynamicBinding() { return false; }
protected:
+ qpid::sys::Mutex bridgeLock;
std::vector<DynamicBridge*> bridgeVector;
virtual void handleHelloRequest();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=709342&r1=709341&r2=709342&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Thu Oct 30 21:15:52
2008
@@ -139,8 +139,10 @@
if (state == STATE_OPERATIONAL)
QPID_LOG (warning, "Inter-broker link disconnected from " << host <<
":" << port);
- for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+ for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
+ (*i)->cancel();
created.push_back(*i);
+ }
active.clear();
if (state != STATE_FAILED)