Author: gsim
Date: Tue May 13 14:38:28 2008
New Revision: 656023

URL: http://svn.apache.org/viewvc?rev=656023&view=rev
Log:
QPID-990: Patch from Ted Ross to enable persisting of inter-broker routing 
entities


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
    incubator/qpid/trunk/qpid/python/commands/qpid-route
    incubator/qpid/trunk/qpid/specs/management-schema.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.cpp Tue May 13 
14:38:28 2008
@@ -20,29 +20,35 @@
  */
 #include "Bridge.h"
 #include "ConnectionState.h"
+#include "LinkRegistry.h"
 
 #include "qpid/management/ManagementAgent.h"
 #include "qpid/framing/FieldTable.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/log/Statement.h"
 
 using qpid::framing::FieldTable;
 using qpid::framing::Uuid;
+using qpid::framing::Buffer;
+using qpid::management::ManagementAgent;
 
 namespace qpid {
 namespace broker {
 
-Bridge::Bridge(Link* link, framing::ChannelId _id, CancellationListener l,
+Bridge::Bridge(Link* _link, framing::ChannelId _id, CancellationListener l,
                const management::ArgsLinkBridge& _args) : 
-    id(_id), args(_args),
-    mgmtObject(new management::Bridge(this, link, id, args.i_src, args.i_dest,
-                                      args.i_key, args.i_src_is_queue, 
args.i_src_is_local)),
-    listener(l), name(Uuid(true).str())
+    link(_link), id(_id), args(_args),
+    mgmtObject(new management::Bridge(this, link, id, args.i_durable, 
args.i_src, args.i_dest,
+                                      args.i_key, args.i_src_is_queue, 
args.i_src_is_local,
+                                      args.i_tag, args.i_excludes)),
+    listener(l), name(Uuid(true).str()), persistenceId(0)
 {
-    management::ManagementAgent::getAgent()->addObject(mgmtObject);
+    if (!args.i_durable)
+        management::ManagementAgent::getAgent()->addObject(mgmtObject);
 }
 
 Bridge::~Bridge() 
-{ 
+{
     mgmtObject->resourceDestroy(); 
 }
 
@@ -65,8 +71,8 @@
             string queue = "bridge_queue_";
             queue += Uuid(true).str();
             FieldTable queueSettings;
-            if (args.i_id.size()) {
-                queueSettings.setString("qpid.trace.id", args.i_id);
+            if (args.i_tag.size()) {
+                queueSettings.setString("qpid.trace.id", args.i_tag);
             }
             if (args.i_excludes.size()) {
                 queueSettings.setString("qpid.trace.exclude", args.i_excludes);
@@ -89,6 +95,81 @@
     peer->getSession().detach(name);
 }
 
+void Bridge::destroy()
+{
+    listener(this);
+}
+
+void Bridge::setPersistenceId(uint64_t id) const
+{
+    if (mgmtObject != 0 && persistenceId == 0)
+    {
+        ManagementAgent::shared_ptr agent = ManagementAgent::getAgent ();
+        agent->addObject (mgmtObject, id);
+    }
+    persistenceId = id;
+}
+
+const string& Bridge::getName() const
+{
+    return name;
+}
+
+Bridge::shared_ptr Bridge::decode(LinkRegistry& links, Buffer& buffer)
+{
+    string   host;
+    uint16_t port;
+    string   src;
+    string   dest;
+    string   key;
+    string   id;
+    string   excludes;
+
+    buffer.getShortString(host);
+    port = buffer.getShort();
+    bool durable(buffer.getOctet());
+    buffer.getShortString(src);
+    buffer.getShortString(dest);
+    buffer.getShortString(key);
+    bool is_queue(buffer.getOctet());
+    bool is_local(buffer.getOctet());
+    buffer.getShortString(id);
+    buffer.getShortString(excludes);
+
+    return links.declare(host, port, durable, src, dest, key,
+                         is_queue, is_local, id, excludes).first;
+}
+
+void Bridge::encode(Buffer& buffer) const 
+{
+    buffer.putShortString(string("bridge"));
+    buffer.putShortString(link->getHost());
+    buffer.putShort(link->getPort());
+    buffer.putOctet(args.i_durable ? 1 : 0);
+    buffer.putShortString(args.i_src);
+    buffer.putShortString(args.i_dest);
+    buffer.putShortString(args.i_key);
+    buffer.putOctet(args.i_src_is_queue ? 1 : 0);
+    buffer.putOctet(args.i_src_is_local ? 1 : 0);
+    buffer.putShortString(args.i_tag);
+    buffer.putShortString(args.i_excludes);
+}
+
+uint32_t Bridge::encodedSize() const 
+{ 
+    return link->getHost().size() + 1 // short-string (host)
+        + 7                // short-string ("bridge")
+        + 2                // port
+        + 1                // durable
+        + args.i_src.size()  + 1
+        + args.i_dest.size() + 1
+        + args.i_key.size()  + 1
+        + 1                // src_is_queue
+        + 1                // src_is_local
+        + args.i_tag.size() + 1
+        + args.i_excludes.size() + 1;
+}
+
 management::ManagementObject::shared_ptr Bridge::GetManagementObject (void) 
const
 {
     return dynamic_pointer_cast<management::ManagementObject>(mgmtObject);
@@ -98,7 +179,7 @@
 {
     if (methodId == management::Bridge::METHOD_CLOSE) {  
         //notify that we are closed
-        listener(this);
+        destroy();
         return management::Manageable::STATUS_OK;
     } else {
         return management::Manageable::STATUS_UNKNOWN_METHOD;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Bridge.h Tue May 13 14:38:28 
2008
@@ -21,8 +21,10 @@
 #ifndef _Bridge_
 #define _Bridge_
 
+#include "PersistableConfig.h"
 #include "qpid/framing/AMQP_ServerProxy.h"
 #include "qpid/framing/ChannelHandler.h"
+#include "qpid/framing/Buffer.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/management/ArgsLinkBridge.h"
 #include "qpid/management/Bridge.h"
@@ -35,10 +37,12 @@
 
 class ConnectionState;
 class Link;
+class LinkRegistry;
 
-class Bridge : public management::Manageable
+class Bridge : public PersistableConfig, public management::Manageable
 {
 public:
+    typedef boost::shared_ptr<Bridge> shared_ptr;
     typedef boost::function<void(Bridge*)> CancellationListener;
 
     Bridge(Link* link, framing::ChannelId id, CancellationListener l, const 
management::ArgsLinkBridge& args);
@@ -46,20 +50,32 @@
 
     void create(ConnectionState& c);
     void cancel();
+    void destroy();
+    bool isDurable() { return args.i_durable; }
 
     management::ManagementObject::shared_ptr GetManagementObject() const;
     management::Manageable::status_t ManagementMethod(uint32_t methodId, 
management::Args& args);
 
+    // PersistableConfig:
+    void     setPersistenceId(uint64_t id) const;
+    uint64_t getPersistenceId() const { return persistenceId; }
+    uint32_t encodedSize() const;
+    void     encode(framing::Buffer& buffer) const; 
+    const std::string& getName() const;
+    static Bridge::shared_ptr decode(LinkRegistry& links, framing::Buffer& 
buffer);
+
 private:
     std::auto_ptr<framing::ChannelHandler>            channelHandler;
     std::auto_ptr<framing::AMQP_ServerProxy::Session> session;
     std::auto_ptr<framing::AMQP_ServerProxy>          peer;
 
+    Link* link;
     framing::ChannelId                  id;
     management::ArgsLinkBridge          args;
     management::Bridge::shared_ptr      mgmtObject;
     CancellationListener listener;
     std::string name;
+    mutable uint64_t  persistenceId;
 };
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Tue May 13 
14:38:28 2008
@@ -329,7 +329,8 @@
             return Manageable::STATUS_FEATURE_NOT_IMPLEMENTED;
 
         std::pair<Link::shared_ptr, bool> response =
-            links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable);
+            links.declare (hp.i_host, hp.i_port, hp.i_useSsl, hp.i_durable,
+                           hp.i_authMechanism, hp.i_username, hp.i_password);
         if (hp.i_durable && response.second)
             store->create(*response.first);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Tue May 13 14:38:28 
2008
@@ -37,14 +37,19 @@
 using qpid::management::Args;
 using qpid::sys::Mutex;
 
-Link::Link(LinkRegistry*           _links,
-           string&                 _host,
-           uint16_t                _port,
-           bool                    _useSsl,
-           bool                    _durable,
-           Broker*                 _broker,
+Link::Link(LinkRegistry*  _links,
+           MessageStore*  _store,
+           string&        _host,
+           uint16_t       _port,
+           bool           _useSsl,
+           bool           _durable,
+           string&        _authMechanism,
+           string&        _username,
+           string&        _password,
+           Broker*        _broker,
            management::Manageable* parent)
-    : links(_links), host(_host), port(_port), useSsl(_useSsl), 
durable(_durable),
+    : links(_links), store(_store), host(_host), port(_port), useSsl(_useSsl), 
durable(_durable),
+      authMechanism(_authMechanism), username(_username), password(_password),
       persistenceId(0), broker(_broker), state(0),
       access(boost::bind(&Link::established, this),
              boost::bind(&Link::closed, this, _1, _2),
@@ -65,7 +70,7 @@
                 agent->addObject(mgmtObject);
         }
     }
-    setState(STATE_WAITING);
+    setStateLH(STATE_WAITING);
 }
 
 Link::~Link ()
@@ -76,7 +81,7 @@
         mgmtObject->resourceDestroy ();
 }
 
-void Link::setState (int newState)
+void Link::setStateLH (int newState)
 {
     if (newState == state)
         return;
@@ -93,13 +98,13 @@
     }
 }
 
-void Link::startConnection ()
+void Link::startConnectionLH ()
 {
     try {
         broker->connect (host, port, useSsl, 0, &access);
-        setState(STATE_CONNECTING);
+        setStateLH(STATE_CONNECTING);
     } catch(std::exception& e) {
-        setState(STATE_WAITING);
+        setStateLH(STATE_WAITING);
         mgmtObject->set_lastError (e.what());
     }
 }
@@ -109,7 +114,7 @@
     Mutex::ScopedLock mutex(lock);
 
     QPID_LOG (info, "Inter-broker link established to " << host << ":" << 
port);
-    setState(STATE_OPERATIONAL);
+    setStateLH(STATE_OPERATIONAL);
     currentInterval = 1;
     visitCount      = 0;
     if (closing)
@@ -124,8 +129,11 @@
         QPID_LOG (warning, "Inter-broker link disconnected from " << host << 
":" << port);
 
     connection.reset();
-    created.transfer(created.end(), active.begin(), active.end(), active);
-    setState(STATE_WAITING);
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+        created.push_back(*i);
+    active.clear();
+
+    setStateLH(STATE_WAITING);
     mgmtObject->set_lastError (text);
     if (closing)
         destroy();
@@ -133,25 +141,56 @@
 
 void Link::destroy ()
 {
+    Mutex::ScopedLock mutex(lock);
+    Bridges toDelete;
+
     QPID_LOG (info, "Inter-broker link to " << host << ":" << port << " 
removed by management");
     connection.reset();
+
+    // Move the bridges to be deleted into a local vector so there is no
+    // corruption of the iterator caused by bridge deletion.
+    for (Bridges::iterator i = active.begin(); i != active.end(); i++)
+        toDelete.push_back(*i);
+    active.clear();
+
+    for (Bridges::iterator i = created.begin(); i != created.end(); i++)
+        toDelete.push_back(*i);
+    created.clear();
+
+    // Now delete all bridges on this link.
+    for (Bridges::iterator i = toDelete.begin(); i != toDelete.end(); i++)
+        (*i)->destroy();
+    toDelete.clear();
+
     links->destroy (host, port);
 }
 
-void Link::cancel(Bridge* bridge)
+void Link::add(Bridge::shared_ptr bridge)
+{
+    Mutex::ScopedLock mutex(lock);
+
+    created.push_back (bridge);
+    if (state == STATE_OPERATIONAL && connection.get() != 0)
+        connection->requestIOProcessing 
(boost::bind(&Link::ioThreadProcessing, this));
+}
+
+void Link::cancel(Bridge::shared_ptr bridge)
 {
     Mutex::ScopedLock mutex(lock);
 
-    //need to take this out of the active map and add it to the cancelled map
+    for (Bridges::iterator i = created.begin(); i != created.end(); i++) {
+        if ((*i).get() == bridge.get()) {
+            created.erase(i);
+            break;
+        }
+    }
     for (Bridges::iterator i = active.begin(); i != active.end(); i++) {
-        if (&(*i) == bridge) {
-            cancelled.transfer(cancelled.end(), i, active);
+        if ((*i).get() == bridge.get()) {
+            bridge->cancel();
+            active.erase(i);
             break;
         }
     }
-
-    if (connection.get() != 0)
-        connection->requestIOProcessing 
(boost::bind(&Link::ioThreadProcessing, this));
 }
 
 void Link::ioThreadProcessing()
@@ -161,21 +200,17 @@
     //process any pending creates
     if (!created.empty()) {
         for (Bridges::iterator i = created.begin(); i != created.end(); ++i) {
-            i->create(*connection);
-        }
-        active.transfer(active.end(), created.begin(), created.end(), created);
-    }
-    if (!cancelled.empty()) {
-        //process any pending cancellations
-        for (Bridges::iterator i = cancelled.begin(); i != cancelled.end(); 
++i) {
-            i->cancel();
+            active.push_back(*i);
+            (*i)->create(*connection);
         }
-        cancelled.clear();
+        created.clear();
     }
 }
 
 void Link::setConnection(Connection::shared_ptr c)
 {
+    Mutex::ScopedLock mutex(lock);
+
     connection = c;
     connection->requestIOProcessing (boost::bind(&Link::ioThreadProcessing, 
this));
 }
@@ -193,11 +228,18 @@
             currentInterval *= 2;
             if (currentInterval > MAX_INTERVAL)
                 currentInterval = MAX_INTERVAL;
-            startConnection();
+            startConnectionLH();
         }
     }
 }
 
+uint Link::nextChannel()
+{
+    Mutex::ScopedLock mutex(lock);
+
+    return channelCounter++;
+}
+
 void Link::setPersistenceId(uint64_t id) const
 {
     if (mgmtObject != 0 && persistenceId == 0)
@@ -217,13 +259,19 @@
 {
     string   host;
     uint16_t port;
+    string   authMechanism;
+    string   username;
+    string   password;
     
     buffer.getShortString(host);
     port = buffer.getShort();
     bool useSsl(buffer.getOctet());
     bool durable(buffer.getOctet());
+    buffer.getShortString(authMechanism);
+    buffer.getShortString(username);
+    buffer.getShortString(password);
 
-    return links.declare(host, port, useSsl, durable).first;
+    return links.declare(host, port, useSsl, durable, authMechanism, username, 
password).first;
 }
 
 void Link::encode(Buffer& buffer) const 
@@ -233,6 +281,9 @@
     buffer.putShort(port);
     buffer.putOctet(useSsl  ? 1 : 0);
     buffer.putOctet(durable ? 1 : 0);
+    buffer.putShortString(authMechanism);
+    buffer.putShortString(username);
+    buffer.putShortString(password);
 }
 
 uint32_t Link::encodedSize() const 
@@ -241,7 +292,10 @@
         + 5                // short-string ("link")
         + 2                // port
         + 1                // useSsl
-        + 1;               // durable
+        + 1                // durable
+        + authMechanism.size() + 1
+        + username.size() + 1
+        + password.size() + 1;
 }
 
 ManagementObject::shared_ptr Link::GetManagementObject (void) const
@@ -251,8 +305,6 @@
 
 Manageable::status_t Link::ManagementMethod (uint32_t op, management::Args& 
args)
 {
-    Mutex::ScopedLock mutex(lock);
-
     switch (op)
     {
     case management::Link::METHOD_CLOSE :
@@ -269,11 +321,14 @@
         if (iargs.i_durable && !durable)
             return Manageable::STATUS_INVALID_PARAMETER;
 
-        created.push_back(new Bridge(this, channelCounter++,
-                                     boost::bind(&Link::cancel, this, _1), 
iargs));
+        std::pair<Bridge::shared_ptr, bool> result =
+            links->declare (host, port, iargs.i_durable, iargs.i_src,
+                            iargs.i_dest, iargs.i_key, iargs.i_src_is_queue,
+                            iargs.i_src_is_local, iargs.i_tag, 
iargs.i_excludes);
+
+        if (result.second && iargs.i_durable)
+            store->create(*result.first);
 
-        if (state == STATE_OPERATIONAL && connection.get() != 0)
-            connection->requestIOProcessing 
(boost::bind(&Link::ioThreadProcessing, this));
         return Manageable::STATUS_OK;
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Link.h Tue May 13 14:38:28 
2008
@@ -45,10 +45,14 @@
         private:
             sys::Mutex          lock;
             LinkRegistry*       links;
-            const string        host;
-            const uint16_t      port;
-            const bool          useSsl;
-            const bool          durable;
+            MessageStore*       store;
+            string        host;
+            uint16_t      port;
+            bool          useSsl;
+            bool          durable;
+            string        authMechanism;
+            string        username;
+            string        password;
             mutable uint64_t    persistenceId;
             management::Link::shared_ptr mgmtObject;
             Broker* broker;
@@ -58,10 +62,9 @@
             uint32_t currentInterval;
             bool     closing;
 
-            typedef boost::ptr_vector<Bridge> Bridges;
+            typedef std::vector<Bridge::shared_ptr> Bridges;
             Bridges created;   // Bridges pending creation
             Bridges active;    // Bridges active
-            Bridges cancelled; // Bridges pending deletion
             uint channelCounter;
             boost::shared_ptr<Connection> connection;
 
@@ -71,29 +74,37 @@
 
             static const uint32_t MAX_INTERVAL = 16;
 
-            void setState (int newState);
-            void startConnection();          // Start the IO Connection
+            void setStateLH (int newState);
+            void startConnectionLH();        // Start the IO Connection
             void established();              // Called when connection is 
created
             void closed(int, std::string);   // Called when connection goes 
away
             void destroy();                  // Called when mgmt deletes this 
link
-            void cancel(Bridge*);            // Called by self-cancelling 
bridge
             void ioThreadProcessing();       // Called on connection's IO 
thread by request
             void setConnection(boost::shared_ptr<Connection>); // Set pointer 
to the AMQP Connection
 
         public:
             typedef boost::shared_ptr<Link> shared_ptr;
 
-            Link(LinkRegistry*           links,
-                 string&                 host,
-                 uint16_t                port,
-                 bool                    useSsl,
-                 bool                    durable,
-                 Broker*                 broker,
+            Link(LinkRegistry* links,
+                 MessageStore* store,
+                 string&       host,
+                 uint16_t      port,
+                 bool          useSsl,
+                 bool          durable,
+                 string&       authMechanism,
+                 string&       username,
+                 string&       password,
+                 Broker*       broker,
                  management::Manageable* parent = 0);
             virtual ~Link();
 
+            std::string getHost() { return host; }
+            uint16_t    getPort() { return port; }
             bool isDurable() { return durable; }
             void maintenanceVisit ();
+            uint nextChannel();
+            void add(Bridge::shared_ptr);
+            void cancel(Bridge::shared_ptr);
 
             // PersistableConfig:
             void     setPersistenceId(uint64_t id) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.cpp Tue May 13 
14:38:28 2008
@@ -46,15 +46,21 @@
 void LinkRegistry::periodicMaintenance ()
 {
     Mutex::ScopedLock locker(lock);
+
     linksToDestroy.clear();
+    bridgesToDestroy.clear();
     for (LinkMap::iterator i = links.begin(); i != links.end(); i++)
         i->second->maintenanceVisit();
 }
 
-pair<Link::shared_ptr, bool> LinkRegistry::declare(std::string& host,
-                                                   uint16_t     port,
-                                                   bool         useSsl,
-                                                   bool         durable)
+pair<Link::shared_ptr, bool> LinkRegistry::declare(string&  host,
+                                                   uint16_t port,
+                                                   bool     useSsl,
+                                                   bool     durable,
+                                                   string&  authMechanism,
+                                                   string&  username,
+                                                   string&  password)
+
 {
     Mutex::ScopedLock   locker(lock);
     stringstream        keystream;
@@ -66,13 +72,64 @@
     {
         Link::shared_ptr link;
 
-        link = Link::shared_ptr (new Link (this, host, port, useSsl, durable, 
broker, parent));
+        link = Link::shared_ptr (new Link (this, store, host, port, useSsl, 
durable,
+                                           authMechanism, username, password,
+                                           broker, parent));
         links[key] = link;
         return std::pair<Link::shared_ptr, bool>(link, true);
     }
     return std::pair<Link::shared_ptr, bool>(i->second, false);
 }
 
+pair<Bridge::shared_ptr, bool> LinkRegistry::declare(std::string& host,
+                                                     uint16_t     port,
+                                                     bool         durable,
+                                                     std::string& src,
+                                                     std::string& dest,
+                                                     std::string& key,
+                                                     bool         is_queue,
+                                                     bool         is_local,
+                                                     std::string& tag,
+                                                     std::string& excludes)
+{
+    Mutex::ScopedLock locker(lock);
+    stringstream      keystream;
+    keystream << host << ":" << port;
+    string linkKey = string(keystream.str());
+
+    keystream << "!" << src << "!" << dest << "!" << key;
+    string bridgeKey = string(keystream.str());
+
+    LinkMap::iterator l = links.find(linkKey);
+    if (l == links.end())
+        return pair<Bridge::shared_ptr, bool>(Bridge::shared_ptr(), false);
+
+    BridgeMap::iterator b = bridges.find(bridgeKey);
+    if (b == bridges.end())
+    {
+        management::ArgsLinkBridge args;
+        Bridge::shared_ptr bridge;
+
+        args.i_durable      = durable;
+        args.i_src          = src;
+        args.i_dest         = dest;
+        args.i_key          = key;
+        args.i_src_is_queue = is_queue;
+        args.i_src_is_local = is_local;
+        args.i_tag          = tag;
+        args.i_excludes     = excludes;
+
+        bridge = Bridge::shared_ptr
+            (new Bridge (l->second.get(), l->second->nextChannel(),
+                         boost::bind(&LinkRegistry::destroy, this,
+                                     host, port, src, dest, key), args));
+        bridges[bridgeKey] = bridge;
+        l->second->add(bridge);
+        return std::pair<Bridge::shared_ptr, bool>(bridge, true);
+    }
+    return std::pair<Bridge::shared_ptr, bool>(b->second, false);
+}
+
 void LinkRegistry::destroy(const string& host, const uint16_t port)
 {
     Mutex::ScopedLock   locker(lock);
@@ -90,6 +147,34 @@
     }
 }
 
+void LinkRegistry::destroy(const std::string& host,
+                           const uint16_t     port,
+                           const std::string& src,
+                           const std::string& dest,
+                           const std::string& key)
+{
+    Mutex::ScopedLock locker(lock);
+    stringstream      keystream;
+    keystream << host << ":" << port;
+    string linkKey = string(keystream.str());
+
+    LinkMap::iterator l = links.find(linkKey);
+    if (l == links.end())
+        return;
+
+    keystream << "!" << src << "!" << dest << "!" << key;
+    string bridgeKey = string(keystream.str());
+    BridgeMap::iterator b = bridges.find(bridgeKey);
+    if (b == bridges.end())
+        return;
+
+    l->second->cancel(b->second);
+    if (b->second->isDurable())
+        store->destroy(*(b->second));
+    bridgesToDestroy[bridgeKey] = b->second;
+    bridges.erase(b);
+}
+
 void LinkRegistry::setStore (MessageStore* _store)
 {
     assert (store == 0 && _store != 0);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LinkRegistry.h Tue May 13 
14:38:28 2008
@@ -24,6 +24,7 @@
 
 #include <map>
 #include "Link.h"
+#include "Bridge.h"
 #include "MessageStore.h"
 #include "Timer.h"
 #include "qpid/sys/Mutex.h"
@@ -47,8 +48,13 @@
         };
 
         typedef std::map<std::string, Link::shared_ptr> LinkMap;
-        LinkMap links;
-        LinkMap linksToDestroy;
+        typedef std::map<std::string, Bridge::shared_ptr> BridgeMap;
+
+        LinkMap   links;
+        LinkMap   linksToDestroy;
+        BridgeMap bridges;
+        BridgeMap bridgesToDestroy;
+
         qpid::sys::Mutex lock;
         Broker* broker;
         Timer   timer;
@@ -59,11 +65,32 @@
 
     public:
         LinkRegistry (Broker* _broker);
-        std::pair<Link::shared_ptr, bool> declare(std::string& host,
-                                                  uint16_t     port,
-                                                  bool         useSsl,
-                                                  bool         durable);
+        std::pair<Link::shared_ptr, bool>
+            declare(std::string& host,
+                    uint16_t     port,
+                    bool         useSsl,
+                    bool         durable,
+                    std::string& authMechanism,
+                    std::string& username,
+                    std::string& password);
+        std::pair<Bridge::shared_ptr, bool>
+            declare(std::string& host,
+                    uint16_t     port,
+                    bool         durable,
+                    std::string& src,
+                    std::string& dest,
+                    std::string& key,
+                    bool         is_queue,
+                    bool         is_local,
+                    std::string& id,
+                    std::string& excludes);
+
         void destroy(const std::string& host, const uint16_t port);
+        void destroy(const std::string& host,
+                     const uint16_t     port,
+                     const std::string& src,
+                     const std::string& dest,
+                     const std::string& key);
 
         /**
          * Register the manageable parent for declared queues

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveryManagerImpl.cpp Tue 
May 13 14:38:28 2008
@@ -23,6 +23,7 @@
 #include "Message.h"
 #include "Queue.h"
 #include "Link.h"
+#include "Bridge.h"
 #include "RecoveredEnqueue.h"
 #include "RecoveredDequeue.h"
 #include "qpid/framing/reply_exceptions.h"
@@ -85,10 +86,11 @@
 
 class RecoverableConfigImpl : public RecoverableConfig
 {
-    // TODO: Add links for other config types, consider using super class 
(PersistableConfig?)
-    Link::shared_ptr link;
+    Link::shared_ptr   link;
+    Bridge::shared_ptr bridge;
 public:
-    RecoverableConfigImpl(Link::shared_ptr _link) : link(_link) {}
+    RecoverableConfigImpl(Link::shared_ptr _link)     : link(_link)     {}
+    RecoverableConfigImpl(Bridge::shared_ptr _bridge) : bridge(_bridge) {}
     void setPersistenceId(uint64_t id);
 };
 
@@ -140,10 +142,10 @@
     string kind;
 
     buffer.getShortString (kind);
-    if (kind == "link")
-    {
+    if      (kind == "link")
         return RecoverableConfig::shared_ptr(new 
RecoverableConfigImpl(Link::decode (links, buffer)));
-    }
+    else if (kind == "bridge")
+        return RecoverableConfig::shared_ptr(new 
RecoverableConfigImpl(Bridge::decode (links, buffer)));
 
     return RecoverableConfig::shared_ptr(); // TODO: raise an exception instead
 }
@@ -212,7 +214,8 @@
 {
     if (link.get())
         link->setPersistenceId(id);
-    // TODO: add calls to other types.  Consider using a parent class.
+    else if (bridge.get())
+        bridge->setPersistenceId(id);
 }
 
 void RecoverableExchangeImpl::bind(string& queueName, string& key, 
framing::FieldTable& args)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionHandler.cpp Tue May 13 
14:38:28 2008
@@ -65,6 +65,9 @@
         } else if (session.get()) {
             //we are attached and frame was not a session control so it is for 
upper layers
             session->handle(f);
+        } else if (m && m->isA<SessionDetachedBody>()) {
+            handleDetach();
+            connection.closeChannel(channel.get()); 
         } else {
             throw NotAttachedException(QPID_MSG("Channel " << channel.get() << 
" is not attached"));                
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Tue May 13 14:38:28 
2008
@@ -171,7 +171,8 @@
         mgmt.call_method(broker, "connect", {"host":remote_host(), 
"port":remote_port()})
         link = mgmt.get_object("link")
 
-        mgmt.call_method(link, "bridge", {"durable":0, 
"src":"my-bridge-queue", "dest":"amq.fanout", "key":"", "id":"", "excludes":"", 
"src_is_queue":1})
+        mgmt.call_method(link, "bridge", {"durable":0, 
"src":"my-bridge-queue", "dest":"amq.fanout",
+                                          "key":"", "tag":"", "excludes":"", 
"src_is_queue":1})
         sleep(6)
         bridge = mgmt.get_object("bridge")
 
@@ -210,7 +211,7 @@
         link = mgmt.get_object("link")
         
         mgmt.call_method(link, "bridge", {"durable":0, "src":"amq.direct", 
"dest":"amq.fanout", "key":"my-key",
-                                          "id":"my-bridge-id", 
"excludes":"exclude-me,also-exclude-me"})
+                                          "tag":"my-bridge-id", 
"excludes":"exclude-me,also-exclude-me"})
         sleep(6)
         bridge = mgmt.get_object("bridge")
 

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Tue May 13 14:38:28 
2008
@@ -31,30 +31,32 @@
 from qpid.util       import connect
 
 def Usage ():
-    print "Usage:  qpid-route [OPTIONS] add   <dest-broker> <src-broker> 
<exchange> <routing-key> [id] [exclude-list]"
-    print "        qpid-route [OPTIONS] del   <dest-broker> <src-broker> 
<exchange> <routing-key>"
-    print "        qpid-route [OPTIONS] list  <dest-broker>"
-    #print "        qpid-route [OPTIONS] load  <filename>"
-    print "        qpid-route [OPTIONS] flush <dest-broker>"
+    print "Usage:  qpid-route [OPTIONS] link add  <dest-broker> <src-broker>"
+    print "        qpid-route [OPTIONS] link del  <dest-broker> <src-broker>"
+    print "        qpid-route [OPTIONS] link list <dest-broker>"
+    print
+    print "        qpid-route [OPTIONS] route add   <dest-broker> <src-broker> 
<exchange> <routing-key> [id] [exclude-list]"
+    print "        qpid-route [OPTIONS] route del   <dest-broker> <src-broker> 
<exchange> <routing-key>"
+    print "        qpid-route [OPTIONS] route list  <dest-broker>"
+    print "        qpid-route [OPTIONS] route flush <dest-broker>"
     print
     print "Options:"
     print "    -s [ --spec-file ] PATH (/usr/share/amqp/amqp.0-10.xml)"
-    print "    -v [ --verbose ]              Verbose output"
-    print "    -q [ --quiet ]                Quiet output, don't print 
duplicate warnings"
+    print "    -v [ --verbose ]         Verbose output"
+    print "    -q [ --quiet ]           Quiet output, don't print duplicate 
warnings"
+    print "    -d [ --durable ]         Added configuration shall be durable"
+    print "    -e [ --del-empty-link ]  Delete link after deleting last route 
on the link"
     print
     print "  dest-broker and src-broker are in the form:  [username/[EMAIL 
PROTECTED] hostname | ip-address [:<port>]"
     print "  ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/[EMAIL 
PROTECTED]"
     print
-    #print "  If loading the route configuration from a file, the input file 
has one line per route"
-    #print "  in the form:"
-    #print
-    #print "  <dest-broker> <src-broker> <exchange> <routing-key>"
-    #print
     sys.exit (1)
 
 _specpath = "/usr/share/amqp/amqp.0-10.xml"
 _verbose  = False
 _quiet    = False
+_durable  = False
+_dellink  = False
 
 class RouteManager:
     def __init__ (self, destBroker):
@@ -87,6 +89,57 @@
                 return link
         return None
 
+    def AddLink (self, srcBroker):
+        self.src  = Broker (srcBroker)
+        mc = self.mclient
+
+        brokers = mc.syncGetObjects (self.mch, "broker")
+        broker = brokers[0]
+        link = self.getLink()
+        if link != None:
+            print "Link already exists"
+            sys.exit(1)
+
+        connectArgs = {}
+        connectArgs["host"]          = self.src.host
+        connectArgs["port"]          = self.src.port
+        connectArgs["useSsl"]        = False
+        connectArgs["durable"]       = _durable
+        connectArgs["authMechanism"] = "PLAIN"
+        connectArgs["username"]      = self.src.username
+        connectArgs["password"]      = self.src.password
+        res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, 
"connect", connectArgs)
+        if _verbose:
+            print "Connect method returned:", res.status, res.statusText
+        link = self.getLink ()
+
+    def DelLink (self, srcBroker):
+        self.src  = Broker (srcBroker)
+        mc = self.mclient
+
+        brokers = mc.syncGetObjects (self.mch, "broker")
+        broker = brokers[0]
+        link = self.getLink()
+        if link == None:
+            print "Link not found"
+            sys.exit(1)
+
+        res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+        if _verbose:
+            print "Close method returned:", res.status, res.statusText
+
+    def ListLinks (self):
+        mc = self.mclient
+        links = mc.syncGetObjects (self.mch, "link")
+        if len(links) == 0:
+            print "No Links Found"
+        else:
+            print
+            print "Host            Port    Durable  State             Last 
Error"
+            print 
"==================================================================="
+            for link in links:
+                print "%-16s%-8d   %c     %-18s%s" % (link.host, link.port, 
YN(link.durable), link.state, link.lastError)
+
     def AddRoute (self, srcBroker, exchange, routingKey, id, excludes):
         self.src  = Broker (srcBroker)
         mc = self.mclient
@@ -103,10 +156,10 @@
             connectArgs["host"]          = self.src.host
             connectArgs["port"]          = self.src.port
             connectArgs["useSsl"]        = False
-            connectArgs["durable"]       = False
-            connectArgs["authMechanism"] = "ANONYMOUS"
-            connectArgs["username"]      = ""
-            connectArgs["password"]      = ""
+            connectArgs["durable"]       = _durable
+            connectArgs["authMechanism"] = "PLAIN"
+            connectArgs["username"]      = self.src.username
+            connectArgs["password"]      = self.src.password
             res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, 
"connect", connectArgs)
             if _verbose:
                 print "Connect method returned:", res.status, res.statusText
@@ -127,15 +180,18 @@
         if _verbose:
             print "Creating inter-broker binding..."
         bridgeArgs = {}
-        bridgeArgs["durable"]      = 0
+        bridgeArgs["durable"]      = _durable
         bridgeArgs["src"]          = exchange
         bridgeArgs["dest"]         = exchange
         bridgeArgs["key"]          = routingKey
-        bridgeArgs["id"]           = id
+        bridgeArgs["tag"]          = id
         bridgeArgs["excludes"]     = excludes
         bridgeArgs["src_is_queue"] = 0
         bridgeArgs["src_is_local"] = 0
         res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", 
bridgeArgs)
+        if res.status == 4:
+            print "Can't create a durable route on a non-durable link"
+            sys.exit(1)
         if _verbose:
             print "Bridge method returned:", res.status, res.statusText
 
@@ -159,7 +215,7 @@
                 if res.status != 0:
                     print "Error closing bridge: %d - %s" % (res.status, 
res.statusText)
                     sys.exit (1)
-                if len (bridges) == 1:
+                if len (bridges) == 1 and _dellink:
                     link = self.getLink ()
                     if link == None:
                         sys.exit (0)
@@ -188,9 +244,6 @@
             if myLink != None:
                 print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, 
myLink.port, bridge.dest, bridge.key)
 
-    def LoadRoutes (self, inFile):
-        pass
-
     def ClearAllRoutes (self):
         mc = self.mclient
         links   = mc.syncGetObjects (self.mch, "link")
@@ -211,23 +264,29 @@
             elif _verbose:
                 print "Ok"
 
-        links = mc.syncGetObjects (self.mch, "link")
-        for link in links:
-            if _verbose:
-                print "Deleting Link: %s:%d... " % (link.host, link.port),
-            res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
-            if res.status != 0:
-                print "Error: %d - %s" % (res.status, res.statusText)
-            elif _verbose:
-                print "Ok"
+        if _dellink:
+            links = mc.syncGetObjects (self.mch, "link")
+            for link in links:
+                if _verbose:
+                    print "Deleting Link: %s:%d... " % (link.host, link.port),
+                res = mc.syncCallMethod (self.mch, link.id, link.classKey, 
"close")
+                if res.status != 0:
+                    print "Error: %d - %s" % (res.status, res.statusText)
+                elif _verbose:
+                    print "Ok"
+
+def YN(val):
+    if val == 1:
+        return 'Y'
+    return 'N'
 
 ##
 ## Main Program
 ##
 
 try:
-    longOpts = ("verbose", "quiet", "spec-file=")
-    (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "s:vq", longOpts)
+    longOpts = ("verbose", "quiet", "spec-file=", "durable", "del-empty-link")
+    (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "s:vqde", longOpts)
 except:
     Usage ()
 
@@ -238,40 +297,57 @@
         _verbose = True
     if opt[0] == "-q" or opt[0] == "--quiet":
         _quiet = True
+    if opt[0] == "-d" or opt[0] == "--durable":
+        _durable = True
+    if opt[0] == "-e" or opt[0] == "--del-empty-link":
+        _dellink = True
 
 nargs = len (cargs)
-if nargs < 2:
+if nargs < 3:
     Usage ()
 
-cmd = cargs[0]
-if cmd != "load":
-    rm  = RouteManager (cargs[1])
-    rm.ConnectToBroker ()
-
-if cmd == "add":
-    if nargs < 5 or nargs > 7:
-        Usage ()
-
-    id = ""
-    excludes = ""
-    if nargs > 5: id = cargs[5]     
-    if nargs > 6: excludes = cargs[6]     
-    rm.AddRoute (cargs[2], cargs[3], cargs[4], id, excludes)
-elif cmd == "del":
-    if nargs != 5:
-        Usage ()
+group = cargs[0]
+cmd   = cargs[1]
+rm    = RouteManager (cargs[2])
+rm.ConnectToBroker ()
+
+if group == "link":
+    if cmd == "add":
+        if nargs != 4:
+            Usage()
+        rm.AddLink (cargs[3])
+    elif cmd == "del":
+        if nargs != 4:
+            Usage()
+        rm.DelLink (cargs[3])
+    elif cmd == "list":
+        if nargs != 3:
+            Usage()
+        rm.ListLinks ()
+
+elif group == "route":
+    if cmd == "add":
+        if nargs < 6 or nargs > 8:
+            Usage ()
+
+        id = ""
+        excludes = ""
+        if nargs > 6: id = cargs[6]     
+        if nargs > 7: excludes = cargs[7]     
+        rm.AddRoute (cargs[3], cargs[4], cargs[5], id, excludes)
+    elif cmd == "del":
+        if nargs != 6:
+            Usage ()
+        else:
+            rm.DelRoute (cargs[3], cargs[4], cargs[5])
     else:
-        rm.DelRoute (cargs[2], cargs[3], cargs[4])
-else:
-    if nargs != 2:
-        Usage ()
-
-    if   cmd == "list":
-        rm.ListRoutes ()
-    #elif cmd == "load":
-    #    rm.LoadRoutes (cargs[1])
-    elif cmd == "flush":
-        rm.ClearAllRoutes ()
-    else:
-        Usage ()
+        if nargs != 3:
+            Usage ()
+
+        if   cmd == "list":
+            rm.ListRoutes ()
+        elif cmd == "flush":
+            rm.ClearAllRoutes ()
+        else:
+            Usage ()
 rm.Disconnect ()

Modified: incubator/qpid/trunk/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/specs/management-schema.xml?rev=656023&r1=656022&r2=656023&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/specs/management-schema.xml (original)
+++ incubator/qpid/trunk/qpid/specs/management-schema.xml Tue May 13 14:38:28 
2008
@@ -243,7 +243,7 @@
       <arg name="src"           dir="I" type="sstr"/>
       <arg name="dest"          dir="I" type="sstr"/>
       <arg name="key"           dir="I" type="sstr" default=""/>
-      <arg name="id"            dir="I" type="sstr" default=""/>
+      <arg name="tag"           dir="I" type="sstr" default=""/>
       <arg name="excludes"      dir="I" type="sstr" default=""/>
       <arg name="src_is_queue"  dir="I" type="bool" default="0"/>
       <arg name="src_is_local"  dir="I" type="bool" default="0"/>
@@ -259,11 +259,14 @@
   <class name="bridge">
     <configElement name="linkRef"       type="objId"  access="RC" index="y" 
parentRef="y"/>
     <configElement name="channelId"     type="uint16" access="RC" index="y"/>
+    <configElement name="durable"       type="bool"   access="RC"/>
     <configElement name="src"           type="sstr"   access="RC"/>
     <configElement name="dest"          type="sstr"   access="RC"/>
     <configElement name="key"           type="sstr"   access="RC"/>
     <configElement name="src_is_queue"  type="bool"   access="RC"/>
     <configElement name="src_is_local"  type="bool"   access="RC"/>
+    <configElement name="tag"           type="sstr"   access="RC"/>
+    <configElement name="excludes"      type="sstr"   access="RC"/>
     <method name="close"/> 
   </class>
 


Reply via email to