Author: aconway
Date: Tue Oct 14 11:21:50 2008
New Revision: 704596

URL: http://svn.apache.org/viewvc?rev=704596&view=rev
Log:
Bug fixes for client-side failover.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
    
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=704596&r1=704595&r2=704596&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Tue Oct 14 
11:21:50 2008
@@ -150,6 +150,7 @@
 static const std::string CONN_CLOSED("Connection closed by broker");
 
 void ConnectionImpl::shutdown() {
+
     Mutex::ScopedLock l(lock);
     if (handler.isClosed()) return;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp?rev=704596&r1=704595&r2=704596&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverConnection.cpp Tue 
Oct 14 11:21:50 2008
@@ -158,6 +158,10 @@
         fs->prepareForFailover ( newConnection );
     }
 
+    connection = newConnection;
+    connection.registerFailureCallback
+        ( boost::bind(&FailoverConnection::failover, this));
+
     /*
      * Tell all sessions to actually failover to the new connection.
      */
@@ -169,10 +173,6 @@
         FailoverSession * fs = * sessions_iterator;
         fs->failover ( );
     }
-
-    connection = newConnection;
-    connection.registerFailureCallback
-        ( boost::bind(&FailoverConnection::failover, this));
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp?rev=704596&r1=704595&r2=704596&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSession.cpp Tue Oct 
14 11:21:50 2008
@@ -82,8 +82,7 @@
 uint32_t 
 FailoverSession::timeout(uint32_t /*seconds*/ )
 {
-
-    // MICK WTF?  return session.timeout ( seconds );
+    // FIXME mgoulish return session.timeout ( seconds );
     return 0;
 }
 

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp?rev=704596&r1=704595&r2=704596&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp 
(original)
+++ 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.cpp 
Tue Oct 14 11:21:50 2008
@@ -34,7 +34,8 @@
 
 FailoverSubscriptionManager::FailoverSubscriptionManager ( FailoverSession * 
fs) :
     name("no_name"),
-    newSessionIsValid(false)
+    newSessionIsValid(false),
+    no_failover(false)
 {
     subscriptionManager = new SubscriptionManager(fs->session);
     fs->setFailoverSubscriptionManager(this);
@@ -45,9 +46,10 @@
 void
 FailoverSubscriptionManager::prepareForFailover ( Session _newSession )
 {
-    Lock l(lock);
+    sys::Monitor::ScopedLock l(lock);
     newSession = _newSession;
     newSessionIsValid = true;
+    // lock.notifyAll();
 }
 
 
@@ -55,28 +57,11 @@
 void
 FailoverSubscriptionManager::failover ( )
 {
-    // Stop the subscription manager thread so it can notice the failover in 
progress.
+    sys::Monitor::ScopedLock l(lock);
+    // Stop the subscription manager thread so it can notice 
+    // the failover in progress.
     subscriptionManager->stop();
-}
-
-
-
-
-FailoverSubscriptionManager::subscribeArgs::subscribeArgs 
-( int _interface,
-  MessageListener * _listener,
-  LocalQueue * _localQueue,
-  const std::string * _queue,
-  const FlowControl * _flow,
-  const std::string * _tag
-) :
-    interface(_interface),
-    listener(_listener),
-    localQueue(_localQueue),
-    queue(_queue),
-    flow(_flow),
-    tag(_tag)
-{
+    lock.notifyAll();
 }
 
 
@@ -86,15 +71,19 @@
 FailoverSubscriptionManager::subscribe ( MessageListener   & listener,
                                          const std::string & queue,
                                          const FlowControl & flow,
-                                         const std::string & tag 
+                                         const std::string & tag,
+                                         bool                record_this
 )
 {
+    sys::Monitor::ScopedLock l(lock);
+
     subscriptionManager->subscribe ( listener,
                                      queue,
                                      flow,
                                      tag
     );
-    subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const 
FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, 
this, boost::ref(listener), queue, flow, tag ) );
+    if ( record_this )
+      subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const 
FlowControl&, const std::string&, bool) )  
&FailoverSubscriptionManager::subscribe, this, boost::ref(listener), queue, 
flow, tag, false ) );
 }
 
 
@@ -103,15 +92,20 @@
 FailoverSubscriptionManager::subscribe ( LocalQueue        & localQueue,
                                          const std::string & queue,
                                          const FlowControl & flow,
-                                         const std::string & tag
+                                         const std::string & tag,
+                                         bool                record_this
 )
 {
+    sys::Monitor::ScopedLock l(lock);
+
     subscriptionManager->subscribe ( localQueue,
                                      queue,
                                      flow,
                                      tag
     );
-    subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const 
FlowControl&, const std::string&) )  &FailoverSubscriptionManager::subscribe, 
this, localQueue, queue, flow, tag ) );
+
+    if ( record_this )
+      subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const 
FlowControl&, const std::string&, bool) )  
&FailoverSubscriptionManager::subscribe, this, localQueue, queue, flow, tag, 
false ) );
 }
 
 
@@ -119,15 +113,19 @@
 void
 FailoverSubscriptionManager::subscribe ( MessageListener   & listener,
                                          const std::string & queue,
-                                         const std::string & tag
+                                         const std::string & tag,
+                                         bool                record_this
 )
 {
+    sys::Monitor::ScopedLock l(lock);
+
     subscriptionManager->subscribe ( listener,
                                      queue,
                                      tag
     );
-    // TODO -- more than one subscription
-    subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const 
std::string&) )  &FailoverSubscriptionManager::subscribe, this, 
boost::ref(listener), queue, tag ) );
+    
+    if ( record_this )
+      subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(MessageListener&, const std::string&, const 
std::string&, bool) )  &FailoverSubscriptionManager::subscribe, this, 
boost::ref(listener), queue, tag, false ) );
 }
 
 
@@ -136,14 +134,19 @@
 void 
 FailoverSubscriptionManager::subscribe ( LocalQueue        & localQueue,
                                          const std::string & queue,
-                                         const std::string & tag
+                                         const std::string & tag,
+                                         bool                record_this
 )
 {
+    sys::Monitor::ScopedLock l(lock);
+
     subscriptionManager->subscribe ( localQueue,
                                      queue,
                                      tag
     );
-    subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const 
std::string&) ) &FailoverSubscriptionManager::subscribe, this, localQueue, 
queue, tag ) );
+
+    if ( record_this )
+      subscribeFns.push_back ( boost::bind ( (void 
(FailoverSubscriptionManager::*)(LocalQueue&, const std::string&, const 
std::string&, bool) ) &FailoverSubscriptionManager::subscribe, this, 
localQueue, queue, tag, false ) );
 }
 
 
@@ -172,31 +175,46 @@
 void 
 FailoverSubscriptionManager::run ( ) // User Thread
 {
+    std::vector<subscribeFn> mySubscribeFns;
+
     while ( 1 )
     {
         subscriptionManager->run ( );
-        Lock l(lock);
+
         // When we drop out of run, if there is a new Session
         // waiting for us, this is a failover.  Otherwise, just
         // return control to usercode.
-        if ( newSessionIsValid ) 
+        
         {
-            delete subscriptionManager;
-            subscriptionManager = new SubscriptionManager(newSession);
-            for ( std::vector<subscribeFn>::iterator i = subscribeFns.begin(); 
-                  i < subscribeFns.end(); 
-                  ++ i 
-            )
-            {
-                (*i) ();
-            }
-            newSessionIsValid = false;
+          sys::Monitor::ScopedLock l(lock);
+
+
+          while ( !newSessionIsValid && !no_failover )
+            lock.wait();
+
+
+          if ( newSessionIsValid ) 
+          {
+             newSessionIsValid = false;
+             delete subscriptionManager;
+             subscriptionManager = new SubscriptionManager(newSession);
+             mySubscribeFns.swap ( subscribeFns );
+          }
+          else
+          {
+              // Not a failover, return to user code.
+              break;
+          }
         }
-        else
+
+        for ( std::vector<subscribeFn>::iterator i = mySubscribeFns.begin(); 
+              i != mySubscribeFns.end(); 
+              ++ i 
+        )
         {
-            // Not a failover, return to user code.
-            break;
+            (*i) ();
         }
+
     }
 }
 
@@ -222,8 +240,11 @@
 void 
 FailoverSubscriptionManager::stop ( )
 {
+    sys::Monitor::ScopedLock l(lock);
 
+    no_failover = true;
     subscriptionManager->stop ( );
+    lock.notifyAll();
 }
 
 

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h?rev=704596&r1=704595&r2=704596&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverSubscriptionManager.h 
Tue Oct 14 11:21:50 2008
@@ -33,7 +33,7 @@
 #include <qpid/client/LocalQueue.h>
 #include <qpid/client/FlowControl.h>
 #include <qpid/sys/Runnable.h>
-#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Monitor.h>
 
 
 
@@ -48,25 +48,27 @@
 
     FailoverSubscriptionManager ( FailoverSession * fs );
 
-    void foo ( int& arg_1 );
-
     void subscribe ( MessageListener   & listener,
                      const std::string & queue,
                      const FlowControl & flow,
-                     const std::string & tag = std::string() );
+                     const std::string & tag = std::string(),
+                     bool  record_this = true );
 
     void subscribe ( LocalQueue        & localQueue,
                      const std::string & queue,
                      const FlowControl & flow,
-                     const std::string & tag=std::string());
+                     const std::string & tag=std::string(),
+                     bool  record_this = true );
 
     void subscribe ( MessageListener   & listener,
                      const std::string & queue,
-                     const std::string & tag = std::string());
+                     const std::string & tag = std::string(),
+                     bool  record_this = true );
 
     void subscribe ( LocalQueue        & localQueue,
                      const std::string & queue,
-                     const std::string & tag=std::string());
+                     const std::string & tag=std::string(),
+                     bool  record_this = true );
 
     bool get ( Message & result, 
                const std::string & queue, 
@@ -115,9 +117,9 @@
     std::string name;
 
 
+
   private:
-    typedef sys::Mutex::ScopedLock Lock;
-    sys::Mutex lock;
+    sys::Monitor lock;
     
     SubscriptionManager * subscriptionManager;
 
@@ -130,32 +132,11 @@
 
     Session newSession;
     bool    newSessionIsValid;
+    bool    no_failover;
+
 
-    /*
-     * */
     typedef boost::function<void ()> subscribeFn;
     std::vector < subscribeFn > subscribeFns;
-
-    struct subscribeArgs
-    {
-      int interface;
-      MessageListener   * listener;
-      LocalQueue        * localQueue;
-      const std::string * queue;
-      const FlowControl * flow;
-      const std::string * tag;
-
-      subscribeArgs ( int _interface,
-                      MessageListener   *,
-                      LocalQueue  *,
-                      const std::string *,
-                      const FlowControl *,
-                      const std::string *
-                    );
-    };
-
-    std::vector < subscribeArgs * > subscriptionReplayVector;
-
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=704596&r1=704595&r2=704596&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Oct 14 
11:21:50 2008
@@ -163,7 +163,10 @@
 void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
 
 void Cluster::mcast(const Event& e, Lock&) {
-    if (state == LEFT) return;
+    if (state == LEFT) {
+        lock.notifyAll();        // threads waiting in getUrls()
+        return;
+    }
     if (state < READY && e.isConnection()) {
         // Stall outgoing connection events.
         QPID_LOG(trace, *this << " MCAST deferred: " << e );
@@ -351,6 +354,7 @@
             map = ClusterMap(memberId, myUrl, true);
             memberUpdate(l);
             unstall(l);
+            lock.notifyAll();   // threads waiting in getUrls()
         }
         else {                  // Joining established group.
             state = NEWBIE;
@@ -417,6 +421,8 @@
 
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
     map.ready(id, Url(url));
+    if (id == memberId)
+        lock.notifyAll(); // threads waiting in getUrls()
     memberUpdate(l);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=704596&r1=704595&r2=704596&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Tue Oct 14 
11:21:50 2008
@@ -146,7 +146,7 @@
         newbies.erase(i);       // No longer a potential dumpee.
         return url;
     }
-    return boost::none;
+    return boost::optional<Url>();
 }
 
 }} // namespace qpid::cluster


Reply via email to