Author: tross
Date: Thu Oct 16 12:45:14 2008
New Revision: 705337
URL: http://svn.apache.org/viewvc?rev=705337&view=rev
Log:
QPID-1366 - implementation of automatic anti-looping for federation
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h
incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
incubator/qpid/trunk/qpid/specs/management-schema.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Oct 16
12:45:14 2008
@@ -91,11 +91,21 @@
string queue = "bridge_queue_";
queue += Uuid(true).str();
FieldTable queueSettings;
+
if (args.i_tag.size()) {
queueSettings.setString("qpid.trace.id", args.i_tag);
+ } else {
+ const string& localTag = link->getBroker()->getFederationTag();
+ if (localTag.size())
+ queueSettings.setString("qpid.trace.id", localTag);
}
+
if (args.i_excludes.size()) {
queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+ } else {
+ const string& peerTag = c.getFederationPeerTag();
+ if (peerTag.size())
+ queueSettings.setString("qpid.trace.exclude", peerTag);
}
bool durable = false;//should this be an arg, or would be use
srcIsQueue for durable queues?
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct 16
12:45:14 2008
@@ -37,6 +37,7 @@
#include "qpid/log/Statement.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/Uuid.h"
#include "qpid/sys/ProtocolFactory.h"
#include "qpid/sys/Poller.h"
#include "qpid/sys/Dispatcher.h"
@@ -136,7 +137,7 @@
managementAgentSingleton(!config.enableMgmt),
store(0),
acl(0),
- dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+ dataDir(conf.noDataDir ? std::string() : conf.dataDir),
links(this),
factory(new ConnectionFactory(*this)),
dtxManager(timer),
@@ -148,40 +149,43 @@
queueCleaner(queues, timer),
getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
{
- if(conf.enableMgmt){
+ if (conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
managementAgent = managementAgentSingleton.getInstance();
((ManagementBroker*) managementAgent)->configure
- (dataDir.isEnabled () ? dataDir.getPath () : string (),
+ (dataDir.isEnabled() ? dataDir.getPath() : string(),
conf.mgmtPubInterval, this, conf.workerThreads + 3);
- _qmf::Package packageInitializer (managementAgent);
+ _qmf::Package packageInitializer(managementAgent);
- System* system = new System (dataDir.isEnabled () ? dataDir.getPath ()
: string ());
- systemObject = System::shared_ptr (system);
+ System* system = new System (dataDir.isEnabled() ? dataDir.getPath() :
string());
+ systemObject = System::shared_ptr(system);
- mgmtObject = new _qmf::Broker (managementAgent, this, system,
conf.port);
- mgmtObject->set_workerThreads (conf.workerThreads);
- mgmtObject->set_maxConns (conf.maxConnections);
- mgmtObject->set_connBacklog (conf.connectionBacklog);
- mgmtObject->set_stagingThreshold (conf.stagingThreshold);
- mgmtObject->set_mgmtPubInterval (conf.mgmtPubInterval);
- mgmtObject->set_version (qpid::version);
+ mgmtObject = new _qmf::Broker(managementAgent, this, system,
conf.port);
+ mgmtObject->set_workerThreads(conf.workerThreads);
+ mgmtObject->set_maxConns(conf.maxConnections);
+ mgmtObject->set_connBacklog(conf.connectionBacklog);
+ mgmtObject->set_stagingThreshold(conf.stagingThreshold);
+ mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
+ mgmtObject->set_version(qpid::version);
if (dataDir.isEnabled())
mgmtObject->set_dataDir(dataDir.getPath());
else
mgmtObject->clr_dataDir();
- managementAgent->addObject (mgmtObject, 0x1000000000000002LL);
+ managementAgent->addObject(mgmtObject, 0x1000000000000002LL);
// Since there is currently no support for virtual hosts, a
placeholder object
// representing the implied single virtual host is added here to keep
the
// management schema correct.
- Vhost* vhost = new Vhost (this);
- vhostObject = Vhost::shared_ptr (vhost);
-
- queues.setParent (vhost);
- exchanges.setParent (vhost);
- links.setParent (vhost);
+ Vhost* vhost = new Vhost(this);
+ vhostObject = Vhost::shared_ptr(vhost);
+ framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid());
+ federationTag = uuid.str();
+ vhostObject->setFederationTag(federationTag);
+
+ queues.setParent(vhost);
+ exchanges.setParent(vhost);
+ links.setParent(vhost);
}
QueuePolicy::setDefaultMaxSize(conf.queueLimit);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Oct 16 12:45:14
2008
@@ -128,6 +128,7 @@
std::vector<Url> knownBrokers;
std::vector<Url> getKnownBrokersImpl();
+ std::string federationTag;
public:
@@ -168,6 +169,7 @@
Options& getOptions() { return config; }
SessionManager& getSessionManager() { return sessionManager; }
+ const std::string& getFederationTag() const { return federationTag; }
management::ManagementObject* GetManagementObject (void) const;
management::Manageable* GetVhostObject (void) const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Thu Oct
16 12:45:14 2008
@@ -42,6 +42,7 @@
const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
const std::string QPID_FED_LINK = "qpid.fed_link";
+const std::string QPID_FED_TAG = "qpid.federation_tag";
}
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId,
MethodId)
@@ -83,6 +84,8 @@
FieldTable properties;
Array mechanisms(0x95);
+ properties.setString(QPID_FED_TAG,
connection.getBroker().getFederationTag());
+
authenticator = SaslAuthenticator::createAuthenticator(c);
authenticator->getMechanisms(mechanisms);
@@ -104,12 +107,13 @@
{
authenticator->start(mechanism, response);
connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
- if (connection.isFederationLink()){
+
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+ if (connection.isFederationLink()) {
if (acl &&
!acl->authorise(connection.getUserId(),acl::CREATE,acl::LINK,"")){
-
client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied
creating a federation link");
- return;
- }
- QPID_LOG(info, "Connection is a federation link");
+
client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied
creating a federation link");
+ return;
+ }
+ QPID_LOG(info, "Connection is a federation link");
}
}
@@ -154,15 +158,18 @@
}
-void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/,
+void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
const framing::Array& /*mechanisms*/,
const framing::Array& /*locales*/)
{
string mechanism = connection.getAuthMechanism();
string response = connection.getAuthCredentials();
+
connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+
FieldTable ft;
ft.setInt(QPID_FED_LINK,1);
+ ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
server.startOk(ft, mechanism, response, en_US);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Oct 16
12:45:14 2008
@@ -68,6 +68,8 @@
void setFederationLink(bool b) { federationLink = b; }
bool isFederationLink() const { return federationLink; }
+ void setFederationPeerTag(const string& tag) { federationPeerTag =
string(tag); }
+ const string& getFederationPeerTag() const { return federationPeerTag; }
Broker& getBroker() { return broker; }
@@ -90,6 +92,7 @@
string userId;
string url;
bool federationLink;
+ string federationPeerTag;
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp Thu Oct 16 12:45:14
2008
@@ -38,3 +38,7 @@
}
}
+void Vhost::setFederationTag(const std::string& tag)
+{
+ mgmtObject->set_federationTag(tag);
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h Thu Oct 16 12:45:14
2008
@@ -41,6 +41,7 @@
management::ManagementObject* GetManagementObject (void) const
{ return mgmtObject; }
+ void setFederationTag(const std::string& tag);
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Thu
Oct 16 12:45:14 2008
@@ -67,6 +67,7 @@
bool dispatchCommand (qpid::broker::Deliverable& msg,
const std::string& routingKey,
const framing::FieldTable* args);
+ const framing::Uuid& getUuid() const { return uuid; }
// Stubs for remote management agent calls
void init (std::string, uint16_t, uint16_t, bool, std::string) {
assert(0); }
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Thu Oct 16 12:45:14
2008
@@ -245,6 +245,65 @@
mgmt.shutdown ()
+ def test_tracing_automatic(self):
+ remoteUrl = "%s:%d" % (remote_host(), remote_port())
+ self.startQmf()
+ l_broker = self.qmf_broker
+ r_broker = self.qmf.addBroker(remoteUrl)
+
+ l_brokerObj = self.qmf.getObjects(_class="broker", _broker=l_broker)[0]
+ r_brokerObj = self.qmf.getObjects(_class="broker", _broker=r_broker)[0]
+
+ l_res = l_brokerObj.connect(remote_host(), remote_port(), False,
"PLAIN", "guest", "guest", "tcp")
+ r_res = r_brokerObj.connect(testrunner.host, testrunner.port, False,
"PLAIN", "guest", "guest", "tcp")
+
+ self.assertEqual(l_res.status, 0)
+ self.assertEqual(r_res.status, 0)
+
+ l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
+ r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
+
+ l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "",
"", False, False, False)
+ r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "",
"", False, False, False)
+
+ self.assertEqual(l_res.status, 0)
+ self.assertEqual(r_res.status, 0)
+
+ count = 0
+ while l_link.state != "Operational" or r_link.state != "Operational":
+ count += 1
+ if count > 10:
+ self.fail("Fed links didn't become operational after 10
seconds")
+ sleep(1)
+ l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
+ r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
+ sleep(3)
+
+ #setup queue to receive messages from local broker
+ session = self.session
+ session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="fed1", exchange="amq.direct",
binding_key="key")
+ self.subscribe(queue="fed1", destination="f1")
+ queue = session.incoming("f1")
+
+ #setup queue on remote broker and add some messages
+ r_conn = self.connect(host=remote_host(), port=remote_port())
+ r_session = r_conn.session("test_trace")
+ for i in range(1, 11):
+ dp = r_session.delivery_properties(routing_key="key")
+ r_session.message_transfer(destination="amq.direct",
message=Message(dp, "Message %d" % i))
+
+ for i in range(1, 11):
+ try:
+ msg = queue.get(timeout=5)
+ self.assertEqual("Message %d" % i, msg.body)
+ except Empty:
+ self.fail("Failed to find expected message containing 'Message
%d'" % i)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in queue: " + extra.body)
+ except Empty: None
+
def test_tracing(self):
session = self.session
Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Oct 16 12:45:14
2008
@@ -114,8 +114,9 @@
===============================================================
-->
<class name="Vhost">
- <property name="brokerRef" type="objId" references="Broker" access="RC"
index="y" parentRef="y"/>
- <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="brokerRef" type="objId" references="Broker"
access="RC" index="y" parentRef="y"/>
+ <property name="name" type="sstr" access="RC" index="y"/>
+ <property name="federationTag" type="sstr" access="RO"/>
</class>
<!--