Author: aconway
Date: Mon Dec  8 09:16:37 2008
New Revision: 724413

URL: http://svn.apache.org/viewvc?rev=724413&view=rev
Log:
Cluster: reduced scope of mcast locks.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=724413&r1=724412&r2=724413&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Dec  8 
09:16:37 2008
@@ -137,48 +137,34 @@
 }
 
 void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& 
id, uint32_t seq) {
-    Lock l(lock);
-    mcastControl(body, id, seq, l);
-}
-
-void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& 
id, uint32_t seq, Lock& l) {
     Event e(Event::control(body, id, seq));
     QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
-    mcast(e, l);
+    mcast(e);
 }
 
-void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) {
+void Cluster::mcastControl(const framing::AMQBody& body) {
     Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
     QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
-    mcast(e, l);
+    mcast(e);
 }
 
 void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& 
connection, uint32_t id) {
-    Lock l(lock);
-    mcastBuffer(data, size, connection, id, l);
-}
-
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& 
connection, uint32_t id, Lock&) {
-    Lock l(lock);
     Event e(DATA, connection, size, id);
     memcpy(e.getData(), data, size);
+    {
+        Lock l(lock);
+        if (state <= CATCHUP && e.isConnection()) {
+            // Stall outgoing connection events untill we are fully READY
+            QPID_LOG(trace, *this << " MCAST deferred: " << e );
+            mcastStallQueue.push_back(e);
+            return;
+        }
+    }
     QPID_LOG(trace, *this << " MCAST " << e);
-    mcast(e, l);
+    mcast(e);
 }
 
-void Cluster::mcast(const Event& e) { Lock l(lock); mcast(e, l); }
-
-void Cluster::mcast(const Event& e, Lock&) {
-    if (state == LEFT) 
-        return;
-    if (state <= CATCHUP && e.isConnection()) {
-        // Stall outgoing connection events untill we are fully READY
-        QPID_LOG(trace, *this << " MCAST deferred: " << e );
-        mcastStallQueue.push_back(e); 
-    }
-    else 
-        mcastQueue.push(e);
-}
+void Cluster::mcast(const Event& e) { mcastQueue.push(e); }
 
 bool Cluster::sendMcast(const Event& e) {
     try {
@@ -383,7 +369,7 @@
         else {                  // Joining established group.
             state = NEWBIE;
             QPID_LOG(info, *this << " joining cluster: " << map);
-            mcastControl(ClusterDumpRequestBody(ProtocolVersion(), 
myUrl.str()), l);
+            mcastControl(ClusterDumpRequestBody(ProtocolVersion(), 
myUrl.str()));
         }
     }
     else if (state >= READY && memberChange)
@@ -393,11 +379,11 @@
 
 
 
-void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
+void Cluster::tryMakeOffer(const MemberId& id, Lock& ) {
     if (state == READY && map.isNewbie(id)) {
         state = OFFER;
         QPID_LOG(info, *this << " send dump-offer to " << id);
-        mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId), 
l);
+        mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id, clusterId));
     }
 }
 
@@ -429,7 +415,7 @@
         state = READY;
         QPID_LOG(notice, *this << " caught up, active cluster member");
         if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
-        for_each(mcastStallQueue.begin(), mcastStallQueue.end(), 
boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+        for_each(mcastStallQueue.begin(), mcastStallQueue.end(), 
boost::bind(&Cluster::mcast, this, _1));
         mcastStallQueue.clear();
     }
 }
@@ -482,11 +468,11 @@
     checkDumpIn(l);
 }
 
-void Cluster::checkDumpIn(Lock& l) {
+void Cluster::checkDumpIn(Lock& ) {
     if (state == LEFT) return;
     if (state == DUMPEE && dumpedMap) {
         map = *dumpedMap;
-        mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
+        mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()));
         // Don't flush the mcast queue till we are READY, on self-deliver.
         state = CATCHUP;
         QPID_LOG(info, *this << " received dump, starting catch-up");
@@ -536,9 +522,9 @@
     leave(l);
 }
 
-void Cluster::stopFullCluster(Lock& l) {
+void Cluster::stopFullCluster(Lock& ) {
     QPID_LOG(notice, *this << " shutting down cluster " << name);
-    mcastControl(ClusterShutdownBody(), l);
+    mcastControl(ClusterShutdownBody());
 }
 
 void Cluster::memberUpdate(Lock& l) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=724413&r1=724412&r2=724413&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Dec  8 
09:16:37 2008
@@ -79,7 +79,6 @@
     // Send to the cluster 
     void mcastControl(const framing::AMQBody& controlBody, const 
ConnectionId&, uint32_t id);
     void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
-    void mcast(const Event& e);
 
     // URLs of current cluster members.
     std::vector<Url> getUrls() const;
@@ -110,11 +109,9 @@
     // The parameter makes it hard to forget since you have to have an 
instance of
     // a Lock to call the unlocked functions.
 
-    // Unlocked versions of public functions
-    void mcastControl(const framing::AMQBody& controlBody, const 
ConnectionId&, uint32_t, Lock&);
-    void mcastControl(const framing::AMQBody& controlBody, Lock&);
-    void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, 
Lock&);
-    void mcast(const Event& e, Lock&);
+    void mcastControl(const framing::AMQBody& controlBody);
+    void mcast(const Event& e);
+
     void leave(Lock&);
     std::vector<Url> getUrls(Lock&) const;
 


Reply via email to