Author: aconway
Date: Tue Sep 16 11:46:11 2008
New Revision: 696003

URL: http://svn.apache.org/viewvc?rev=696003&view=rev
Log:
Fix race in cluster join protocol.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=696003&r1=696002&r2=696003&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Sep 16 
11:46:11 2008
@@ -27,6 +27,7 @@
 #include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterDumpRequestBody.h"
 #include "qpid/framing/ClusterUpdateBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
 #include "qpid/log/Statement.h"
@@ -53,8 +54,9 @@
     ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) 
{}
     bool invoke(AMQFrame& f) { return framing::invoke(*this, 
*f.getBody()).wasHandled(); }
 
-    void dumpRequest(const std::string& u) { cluster.dumpRequest(member, u); }
-    void update(const FieldTable& members,bool dumping) { 
cluster.update(members, dumping); }
+    void update(const FieldTable& members, uint64_t dumping) { 
cluster.update(members, dumping); }
+    void dumpRequest(const std::string& url) { cluster.dumpRequest(member, 
url); }
+    void ready(const std::string& url) { cluster.ready(member, url); }
 };
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) 
:
@@ -76,7 +78,6 @@
     broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
     cpgDispatchHandle.startWatch(poller);
     cpg.join(name);
-    
 }
 
 Cluster::~Cluster() {}
@@ -218,6 +219,19 @@
     return o;
 }
 
+void Cluster::dispatch(sys::DispatchHandle& h) {
+    cpg.dispatchAll();
+    h.rewatch();
+}
+
+void Cluster::disconnect(sys::DispatchHandle& ) {
+    // FIXME aconway 2008-09-11: this should be logged as critical,
+    // when we provide admin option to shut down cluster and let
+    // members leave cleanly.
+    QPID_LOG(notice, self << " disconnected from cluster " << name.str());
+    broker.shutdown();
+}
+
 void Cluster::configChange(
     cpg_handle_t /*handle*/,
     cpg_name */*group*/,
@@ -225,79 +239,78 @@
     cpg_address *left, int nLeft,
     cpg_address */*joined*/, int nJoined)
 {
+    Mutex::ScopedLock l(lock);
     // FIXME aconway 2008-09-15: use group terminology not cluster. Member not 
node.
-    QPID_LOG(notice, "Current cluster: " << AddrList(current, nCurrent));
-    QPID_LOG_IF(notice, nLeft, "Left the cluster: " << AddrList(left, nLeft));
-    if (find(left, left+nLeft, self) != left+nLeft) {
-        // We have left the group, this is the final config change.
+    QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent));
+    QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft));
+    
+    map.left(left, nLeft);
+    if (find(left, left+nLeft, self) != left+nLeft) { 
+        // I have left the group, this is the final config change.
         QPID_LOG(notice, self << " left cluster " << name.str());
         broker.shutdown();
+        return;
     }
-    Mutex::ScopedLock l(lock);
-    map.configChange(current, nCurrent);
-    if (state == START && nCurrent == 1) {    // First in cluster
-        assert(*current == self);
-        assert(map.empty());
-        QPID_LOG(notice, self << " first in cluster.");
-        map.insert(self, url);
-        ready();
-    }
-    else if (nJoined && self == map.first()) { // Send an update to new 
members.
-        mcastControl(map.toControl(), 0);
+
+    if (state == START) {
+        if (nCurrent == 1 && *current == self) { // First in cluster.
+            // First in cluster
+            QPID_LOG(notice, self << " first in cluster.");
+            map.add(self, url);
+            ready();
+        }
+        return;                 
     }
-}
 
-void Cluster::dispatch(sys::DispatchHandle& h) {
-    cpg.dispatchAll();
-    h.rewatch();
-}
+    if (state == DISCARD && !map.dumper) // try another dump request.
+        mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
 
-void Cluster::disconnect(sys::DispatchHandle& ) {
-    // FIXME aconway 2008-09-11: this should be logged as critical,
-    // when we provide admin option to shut down cluster and let
-    // members leave cleanly.
-    QPID_LOG(notice, self << " disconnected from cluster " << name.str());
-    broker.shutdown();
+    if (nJoined && map.sendUpdate(self))  // New members need update
+        mcastControl(map.toControl(), 0);
 }
 
-void Cluster::update(const FieldTable& members, bool dumping) {
+void Cluster::update(const FieldTable& members, uint64_t dumper) {
     Mutex::ScopedLock l(lock);
-    map.update(members, dumping);
-    QPID_LOG(info, "Cluster update:\n    " << map);
-    if (state == START && dumping == false)  {
-        state = DISCARD;
+    map.update(members, dumper);
+    QPID_LOG(debug, "Cluster update: " << map);
+    if (state == START) state = DISCARD; // Got first update.
+    if (state == DISCARD && !map.dumper)
         mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
-    }
 }
 
-void Cluster::dumpRequest(const MemberId& m, const string& urlStr) {
+void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
     Mutex::ScopedLock l(lock);
-    bool wasDumping = map.isDumping();
-    map.setDumping(true);
-    if (!wasDumping) {
-        if (self == m) {        // My turn
-            assert(state == DISCARD);
-            // FIXME aconway 2008-09-15: RECEIVE DUMP
-            // state = CATCHUP; 
-            // stall();
-            // When received
-            map.insert(self, url);
-            mcastControl(map.toControl(), 0);
-            ready();
-        }
-        else if (state == READY && self == map.first()) { // Give the dump.
-            QPID_LOG(info, self << " dumping to " << url);
-            // FIXME aconway 2008-09-15: stall & send brain dump - finish 
DumpClient.
-            // state = DUMPING;
-            // stall();
-            (void)urlStr;       
-            // When dump complete:
-            map.setDumping(false);
-            mcastControl(map.toControl(), 0); 
-        }
+    if (map.dumper) return;     // Dump already in progress, ignore.
+    map.dumper = map.first();
+    if (dumpee == self && state == DISCARD) { // My turn to receive a dump.
+        QPID_LOG(info, self << " receiving state dump from " << map.dumper);
+        // FIXME aconway 2008-09-15: RECEIVE DUMP
+        // state = CATCHUP; 
+        // stall();
+        // When received
+        mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+        ready();
+    }
+    else if (map.dumper == self && state == READY) { // My turn to send the 
dump
+        QPID_LOG(info, self << " sending state dump to " << dumpee);
+        // FIXME aconway 2008-09-15: stall & send brain dump - finish 
DumpClient.
+        // state = DUMPING;
+        // stall();
+        (void)urlStr;       
+        // When dump complete:
+        assert(map.dumper == self);
+        ClusterUpdateBody b = map.toControl();
+        b.setDumper(0);
+        mcastControl(b, 0);
+        // NB: Don't modify my own map till self-delivery.
     }
 }
 
+void Cluster::ready(const MemberId& member, const std::string& url) {
+    Mutex::ScopedLock l(lock);
+    map.add(member, Url(url));
+}
+
 broker::Broker& Cluster::getBroker(){ return broker; }
 
 void Cluster::stall() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=696003&r1=696002&r2=696003&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Sep 16 
11:46:11 2008
@@ -74,9 +74,11 @@
     
     /** Leave the cluster */
     void leave();
-    
+
+    // Cluster controls.
+    void update(const framing::FieldTable& members, uint64_t dumping);
     void dumpRequest(const MemberId&, const std::string& url);
-    void update(const framing::FieldTable& members, bool dumping);
+    void ready(const MemberId&, const std::string& url);
 
     MemberId getSelf() const { return self; }
 
@@ -91,12 +93,11 @@
     typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > 
ConnectionMap;
     typedef sys::PollableQueue<Event> EventQueue;
     enum State {
-        START,      // Have not yet received first cluster update.
+        START,      // Start state, no cluster update received yet.
         DISCARD,    // Discard updates up to dump start point.
-        HAVE_DUMP,       // Received state dump, waiting for catchup point.
-        CATCHUP,         // Stalled at catchup point, waiting for dump.
-        DUMPING,         // Stalled while sending a state dump.
-        READY            // Normal processing.
+        CATCHUP,    // Stalled at catchup point, waiting for dump.
+        DUMPING,    // Stalled while sending a state dump.
+        READY       // Normal processing.
     };
 
     void connectionEvent(const Event&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=696003&r1=696002&r2=696003&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Tue Sep 16 
11:46:11 2008
@@ -32,56 +32,58 @@
 
 namespace cluster {
 
-ClusterMap::ClusterMap() : dumping(false) {}
+ClusterMap::ClusterMap() {}
 
-MemberId ClusterMap::first() {
-    return (empty()) ?  MemberId() : begin()->first;
+MemberId ClusterMap::first() const {
+    return (members.empty()) ?  MemberId() : members.begin()->first;
 }
 
-void ClusterMap::configChange(const cpg_address* addrs, size_t size) {
-    iterator i = begin();
-    while (i != end()) { // Remove members that are no longer in addrs.
-        if (std::find(addrs, addrs+size, i->first) == addrs+size)
-            erase(i++);
-        else
-            ++i;
-    }
+void ClusterMap::left(const cpg_address* addrs, size_t size) {
+    size_t (Members::*erase)(const MemberId&) = &Members::erase;
+    std::for_each(addrs, addrs+size, boost::bind(erase, &members, _1));
+    if (dumper && !isMember(dumper))
+        dumper = MemberId();
 }
 
 framing::ClusterUpdateBody ClusterMap::toControl() const {
     framing::ClusterUpdateBody b;
-    for (const_iterator i = begin(); i != end(); ++i) 
+    for (Members::const_iterator i = members.begin(); i != members.end(); ++i) 
         b.getMembers().setString(i->first.str(), i->second.str());
-    b.setDumping(dumping);
+    b.setDumper(dumper);
     return b;
 }
 
-void ClusterMap::update(const FieldTable& ftMembers, bool dump) {
-    dumping = dump;
+void ClusterMap::update(const FieldTable& ftMembers, uint64_t dumper_) {
     FieldTable::ValueMap::const_iterator i;
     for (i = ftMembers.begin(); i != ftMembers.end(); ++i) 
-        (*this)[i->first] = Url(i->second->get<std::string>());
-}
-
-void ClusterMap::fromControl(const framing::ClusterUpdateBody& b) {
-    update(b.getMembers(), b.getDumping());
+        members[i->first] = Url(i->second->get<std::string>());
+    dumper = MemberId(dumper_);
 }
 
 std::vector<Url> ClusterMap::memberUrls() const {
     std::vector<Url> result(size());
-    std::transform(begin(), end(), result.begin(),
-                   boost::bind(&value_type::second, _1));
+    std::transform(members.begin(), members.end(), result.begin(),
+                   boost::bind(&Members::value_type::second, _1));
     return result;        
 }
 
-std::ostream& operator<<(std::ostream& o, const ClusterMap::value_type& mv) {
+std::ostream& operator<<(std::ostream& o, const 
ClusterMap::Members::value_type& mv) {
     return o << mv.first << "=" << mv.second;
 }
 
 std::ostream& operator<<(std::ostream& o, const ClusterMap& m) {
-    std::ostream_iterator<ClusterMap::value_type> im(o, "\n    ");
-    std::copy(m.begin(), m.end(), im);
+    std::ostream_iterator<ClusterMap::Members::value_type> im(o, "\n    ");
+    o << "dumper=" << m.dumper << ", members:\n    ";
+    std::copy(m.members.begin(), m.members.end(), im);
     return o;
 }
 
+bool ClusterMap::sendUpdate(const MemberId& id) const {
+    return dumper==id || (!dumper && first() == id);
+}
+
+void ClusterMap::add(const MemberId& id, const Url& url) {
+    members[id] = url;
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=696003&r1=696002&r2=696003&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Tue Sep 16 
11:46:11 2008
@@ -41,39 +41,39 @@
  * A dumper is an established member that is sending catch-up data.
  * A dumpee is an aspiring member that is receiving catch-up data.
  */
-class ClusterMap : public std::map<MemberId, Url> {
+class ClusterMap {
   public:
+    typedef std::map<MemberId, Url> Members;
+    Members members;
+    MemberId dumper;
+
     ClusterMap();
     
     /** First member of the cluster in ID order, gets to perform one-off 
tasks. */
-    MemberId first();
-
-    /** Update for CPG config change. */
-    void configChange(const cpg_address* addrs, size_t size);
+    MemberId first() const;
 
+    /** Update for members leaving. */
+    void left(const cpg_address* addrs, size_t size);
 
-    /** Convert map contents to a cluster control body. */
+    /** Convert map contents to a cluster update body. */
     framing::ClusterUpdateBody toControl() const;
 
-    /** Update with first member. */
-    using std::map<MemberId, Url>::insert;
-    void insert(const MemberId& id, const Url& url) { 
insert(value_type(id,url)); }
-    void setDumping(bool d) { dumping = d; }
+    /** Add a new member. */
+    void add(const MemberId& id, const Url& url);
 
     /** Apply update delivered from clsuter. */
-    void update(const framing::FieldTable& members, bool dumping);
-    void fromControl(const framing::ClusterUpdateBody&);
+    void update(const framing::FieldTable& members, uint64_t dumper);
 
-    bool isMember(const MemberId& id) const { return find(id) != end(); }
-    bool isDumping() const { return dumping; }
+    bool isMember(const MemberId& id) const { return members.find(id) != 
members.end(); }
 
+    bool sendUpdate(const MemberId& id) const; // True if id should send an 
update.
     std::vector<Url> memberUrls() const;
-
+    size_t size() const { return members.size(); }
+    
   private:
-    bool dumping;
 
   friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
-  friend std::ostream& operator<<(std::ostream& o, const 
ClusterMap::value_type& mv);
+  friend std::ostream& operator<<(std::ostream& o, const 
ClusterMap::Members::value_type& mv);
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=696003&r1=696002&r2=696003&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Tue Sep 16 11:46:11 
2008
@@ -41,12 +41,13 @@
 
 /** first=node-id, second=pid */
 struct MemberId : std::pair<uint32_t, uint32_t> {
+    explicit MemberId(uint64_t n) : std::pair<uint32_t,uint32_t>( n >> 32, n & 
0xffffffff) {}
     explicit MemberId(uint32_t node=0, uint32_t pid=0) : 
std::pair<uint32_t,uint32_t>(node, pid) {}
     MemberId(const cpg_address& caddr) : 
std::pair<uint32_t,uint32_t>(caddr.nodeid, caddr.pid) {}
     MemberId(const std::string&); // Decode from string.
     uint32_t getNode() const { return first; }
     uint32_t getPid() const { return second; }
-    operator bool() const { return first || second; }
+    operator uint64_t() const { return (uint64_t(first)<<32ull) + second; }
 
     // Encode as string, network byte order.
     std::string str() const;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=696003&r1=696002&r2=696003&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Tue Sep 16 
11:46:11 2008
@@ -140,9 +140,7 @@
     return o;
 }
 
-
-// FIXME aconway 2008-09-12: finish the new join protocol.
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchUpSharedState, 1) {
+QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) {
     ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
     // Create some shared state.

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=696003&r1=696002&r2=696003&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Tue Sep 16 11:46:11 2008
@@ -27,15 +27,18 @@
   <class name = "cluster" code = "0x80" label="Qpid clustering extensions.">
     <doc>Qpid extension class to allow clustered brokers to communicate.</doc>
 
-    <control name="update" code="0x4" label="Cluster status update.">
+    <control name="update" code="0x1" label="Cluster status update.">
       <field name="members" type="map"/>     <!-- member-id -> URL -->
-      <field name="dumping" type="boolean"/> <!-- currently dumping state to 
new member. -->
+      <field name="dumper" type="uint64"/>   <!-- member currently dumping 
state. -->
     </control>
 
-    <control name = "dump-request" code="0x1" label="New meber requests brain 
dump">
+    <control name = "dump-request" code="0x2" label="New meber requests brain 
dump">
       <field name="url" type="str16" label="Url for brain dump."/>
     </control>
 
+    <control name="ready" code="0x3" label="New member is ready.">
+      <field name="url" type="str16" label="Url for brain dump."/>
+    </control>
   </class>
 
   <!-- TODO aconway 2008-09-10: support for un-attached connections. -->


Reply via email to