Author: tross
Date: Thu Oct 16 12:45:14 2008
New Revision: 705337

URL: http://svn.apache.org/viewvc?rev=705337&view=rev
Log:
QPID-1366 - implementation of automatic anti-looping for federation

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Thu Oct 16 
12:45:14 2008
@@ -91,11 +91,21 @@
         string queue = "bridge_queue_";
         queue += Uuid(true).str();
         FieldTable queueSettings;
+
         if (args.i_tag.size()) {
             queueSettings.setString("qpid.trace.id", args.i_tag);
+        } else {
+            const string& localTag = link->getBroker()->getFederationTag();
+            if (localTag.size())
+                queueSettings.setString("qpid.trace.id", localTag);
         }
+
         if (args.i_excludes.size()) {
             queueSettings.setString("qpid.trace.exclude", args.i_excludes);
+        } else {
+            const string& peerTag = c.getFederationPeerTag();
+            if (peerTag.size())
+                queueSettings.setString("qpid.trace.exclude", peerTag);
         }
 
         bool durable = false;//should this be an arg, or would be use 
srcIsQueue for durable queues?

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct 16 
12:45:14 2008
@@ -37,6 +37,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/sys/ProtocolFactory.h"
 #include "qpid/sys/Poller.h"
 #include "qpid/sys/Dispatcher.h"
@@ -136,7 +137,7 @@
     managementAgentSingleton(!config.enableMgmt),
     store(0),
     acl(0),
-    dataDir(conf.noDataDir ? std::string () : conf.dataDir),
+    dataDir(conf.noDataDir ? std::string() : conf.dataDir),
     links(this),
     factory(new ConnectionFactory(*this)),
     dtxManager(timer),
@@ -148,40 +149,43 @@
     queueCleaner(queues, timer),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
-    if(conf.enableMgmt){
+    if (conf.enableMgmt) {
         QPID_LOG(info, "Management enabled");
         managementAgent = managementAgentSingleton.getInstance();
         ((ManagementBroker*) managementAgent)->configure
-            (dataDir.isEnabled () ? dataDir.getPath () : string (),
+            (dataDir.isEnabled() ? dataDir.getPath() : string(),
              conf.mgmtPubInterval, this, conf.workerThreads + 3);
-        _qmf::Package packageInitializer (managementAgent);
+        _qmf::Package packageInitializer(managementAgent);
 
-        System* system = new System (dataDir.isEnabled () ? dataDir.getPath () 
: string ());
-        systemObject = System::shared_ptr (system);
+        System* system = new System (dataDir.isEnabled() ? dataDir.getPath() : 
string());
+        systemObject = System::shared_ptr(system);
 
-        mgmtObject = new _qmf::Broker (managementAgent, this, system, 
conf.port);
-        mgmtObject->set_workerThreads    (conf.workerThreads);
-        mgmtObject->set_maxConns         (conf.maxConnections);
-        mgmtObject->set_connBacklog      (conf.connectionBacklog);
-        mgmtObject->set_stagingThreshold (conf.stagingThreshold);
-        mgmtObject->set_mgmtPubInterval  (conf.mgmtPubInterval);
-        mgmtObject->set_version          (qpid::version);
+        mgmtObject = new _qmf::Broker(managementAgent, this, system, 
conf.port);
+        mgmtObject->set_workerThreads(conf.workerThreads);
+        mgmtObject->set_maxConns(conf.maxConnections);
+        mgmtObject->set_connBacklog(conf.connectionBacklog);
+        mgmtObject->set_stagingThreshold(conf.stagingThreshold);
+        mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval);
+        mgmtObject->set_version(qpid::version);
         if (dataDir.isEnabled())
             mgmtObject->set_dataDir(dataDir.getPath());
         else
             mgmtObject->clr_dataDir();
         
-        managementAgent->addObject (mgmtObject, 0x1000000000000002LL);
+        managementAgent->addObject(mgmtObject, 0x1000000000000002LL);
 
         // Since there is currently no support for virtual hosts, a 
placeholder object
         // representing the implied single virtual host is added here to keep 
the
         // management schema correct.
-        Vhost* vhost = new Vhost (this);
-        vhostObject = Vhost::shared_ptr (vhost);
-
-        queues.setParent    (vhost);
-        exchanges.setParent (vhost);
-        links.setParent     (vhost);
+        Vhost* vhost = new Vhost(this);
+        vhostObject = Vhost::shared_ptr(vhost);
+        framing::Uuid uuid(((ManagementBroker*) managementAgent)->getUuid());
+        federationTag = uuid.str();
+        vhostObject->setFederationTag(federationTag);
+
+        queues.setParent(vhost);
+        exchanges.setParent(vhost);
+        links.setParent(vhost);
     }
 
     QueuePolicy::setDefaultMaxSize(conf.queueLimit);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Oct 16 12:45:14 
2008
@@ -128,6 +128,7 @@
 
     std::vector<Url> knownBrokers;
     std::vector<Url> getKnownBrokersImpl();
+    std::string federationTag;
 
   public:
 
@@ -168,6 +169,7 @@
     Options& getOptions() { return config; }
 
     SessionManager& getSessionManager() { return sessionManager; }
+    const std::string& getFederationTag() const { return federationTag; }
 
     management::ManagementObject*     GetManagementObject (void) const;
     management::Manageable*           GetVhostObject      (void) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp Thu Oct 
16 12:45:14 2008
@@ -42,6 +42,7 @@
 const std::string PLAIN     = "PLAIN";
 const std::string en_US     = "en_US";
 const std::string QPID_FED_LINK = "qpid.fed_link";
+const std::string QPID_FED_TAG  = "qpid.federation_tag";
 }
 
 void ConnectionHandler::close(ReplyCode code, const string& text, ClassId, 
MethodId)
@@ -83,6 +84,8 @@
         FieldTable properties;
         Array mechanisms(0x95);
 
+        properties.setString(QPID_FED_TAG, 
connection.getBroker().getFederationTag());
+
         authenticator = SaslAuthenticator::createAuthenticator(c);
         authenticator->getMechanisms(mechanisms);
 
@@ -104,12 +107,13 @@
 {
     authenticator->start(mechanism, response);
     connection.setFederationLink(clientProperties.get(QPID_FED_LINK));
-    if (connection.isFederationLink()){
+    
connection.setFederationPeerTag(clientProperties.getAsString(QPID_FED_TAG));
+    if (connection.isFederationLink()) {
        if (acl && 
!acl->authorise(connection.getUserId(),acl::CREATE,acl::LINK,"")){
-             
client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied 
creating a federation link");
-             return;
-       }
-       QPID_LOG(info, "Connection is a federation link");
+            
client.close(framing::connection::CLOSE_CODE_CONNECTION_FORCED,"ACL denied 
creating a federation link");
+            return;
+        }
+        QPID_LOG(info, "Connection is a federation link");
     }
 }
 
@@ -154,15 +158,18 @@
 } 
 
 
-void ConnectionHandler::Handler::start(const FieldTable& /*serverProperties*/,
+void ConnectionHandler::Handler::start(const FieldTable& serverProperties,
                                        const framing::Array& /*mechanisms*/,
                                        const framing::Array& /*locales*/)
 {
     string mechanism = connection.getAuthMechanism();
     string response  = connection.getAuthCredentials();
 
+    
connection.setFederationPeerTag(serverProperties.getAsString(QPID_FED_TAG));
+
     FieldTable ft;
     ft.setInt(QPID_FED_LINK,1);
+    ft.setString(QPID_FED_TAG, connection.getBroker().getFederationTag());
     server.startOk(ft, mechanism, response, en_US);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Thu Oct 16 
12:45:14 2008
@@ -68,6 +68,8 @@
 
     void setFederationLink(bool b) {  federationLink = b; }
     bool isFederationLink() const { return federationLink; }
+    void setFederationPeerTag(const string& tag) { federationPeerTag = 
string(tag); }
+    const string& getFederationPeerTag() const { return federationPeerTag; }
 
     Broker& getBroker() { return broker; }
 
@@ -90,6 +92,7 @@
     string userId;
     string url;
     bool federationLink;
+    string federationPeerTag;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.cpp Thu Oct 16 12:45:14 
2008
@@ -38,3 +38,7 @@
     }
 }
 
+void Vhost::setFederationTag(const std::string& tag)
+{
+    mgmtObject->set_federationTag(tag);
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Vhost.h Thu Oct 16 12:45:14 
2008
@@ -41,6 +41,7 @@
 
     management::ManagementObject* GetManagementObject (void) const
     { return mgmtObject; }
+    void setFederationTag(const std::string& tag);
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/management/ManagementBroker.h Thu 
Oct 16 12:45:14 2008
@@ -67,6 +67,7 @@
     bool dispatchCommand (qpid::broker::Deliverable&       msg,
                           const std::string&         routingKey,
                           const framing::FieldTable* args);
+    const framing::Uuid& getUuid() const { return uuid; }
 
     // Stubs for remote management agent calls
     void init (std::string, uint16_t, uint16_t, bool, std::string) { 
assert(0); }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Thu Oct 16 12:45:14 
2008
@@ -245,6 +245,65 @@
 
         mgmt.shutdown ()
 
+    def test_tracing_automatic(self):
+        remoteUrl = "%s:%d" % (remote_host(), remote_port())
+        self.startQmf()
+        l_broker = self.qmf_broker
+        r_broker = self.qmf.addBroker(remoteUrl)
+
+        l_brokerObj = self.qmf.getObjects(_class="broker", _broker=l_broker)[0]
+        r_brokerObj = self.qmf.getObjects(_class="broker", _broker=r_broker)[0]
+
+        l_res = l_brokerObj.connect(remote_host(), remote_port(),     False, 
"PLAIN", "guest", "guest", "tcp")
+        r_res = r_brokerObj.connect(testrunner.host, testrunner.port, False, 
"PLAIN", "guest", "guest", "tcp")
+
+        self.assertEqual(l_res.status, 0)
+        self.assertEqual(r_res.status, 0)
+
+        l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
+        r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
+
+        l_res = l_link.bridge(False, "amq.direct", "amq.direct", "key", "", 
"", False, False, False)
+        r_res = r_link.bridge(False, "amq.direct", "amq.direct", "key", "", 
"", False, False, False)
+
+        self.assertEqual(l_res.status, 0)
+        self.assertEqual(r_res.status, 0)
+
+        count = 0
+        while l_link.state != "Operational" or r_link.state != "Operational":
+            count += 1
+            if count > 10:
+                self.fail("Fed links didn't become operational after 10 
seconds")
+            sleep(1)
+            l_link = self.qmf.getObjects(_class="link", _broker=l_broker)[0]
+            r_link = self.qmf.getObjects(_class="link", _broker=r_broker)[0]
+        sleep(3)
+
+        #setup queue to receive messages from local broker
+        session = self.session
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="amq.direct", 
binding_key="key")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        #setup queue on remote broker and add some messages
+        r_conn = self.connect(host=remote_host(), port=remote_port())
+        r_session = r_conn.session("test_trace")
+        for i in range(1, 11):
+            dp = r_session.delivery_properties(routing_key="key")
+            r_session.message_transfer(destination="amq.direct", 
message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            try:
+                msg = queue.get(timeout=5)
+                self.assertEqual("Message %d" % i, msg.body)
+            except Empty:
+                self.fail("Failed to find expected message containing 'Message 
%d'" % i)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
     def test_tracing(self):
         session = self.session
         

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=705337&r1=705336&r2=705337&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Thu Oct 16 12:45:14 
2008
@@ -114,8 +114,9 @@
   ===============================================================
   -->
   <class name="Vhost">
-    <property name="brokerRef" type="objId" references="Broker" access="RC" 
index="y" parentRef="y"/>
-    <property name="name"      type="sstr"  access="RC" index="y"/>
+    <property name="brokerRef"     type="objId" references="Broker" 
access="RC" index="y" parentRef="y"/>
+    <property name="name"          type="sstr"  access="RC" index="y"/>
+    <property name="federationTag" type="sstr"  access="RO"/>
   </class>
 
   <!--


Reply via email to