Author: aconway
Date: Thu Oct 16 10:07:26 2008
New Revision: 705287

URL: http://svn.apache.org/viewvc?rev=705287&view=rev
Log:
Fix race in cluster causing incorrect known-broker lists to be sent to clients.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu Oct 16 
10:07:26 2008
@@ -32,17 +32,18 @@
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
-using namespace qpid::client;
+
+namespace qpid {
+namespace client {
+
 using namespace qpid::framing;
 using namespace qpid::framing::connection;
 using namespace qpid::sys;
-
 using namespace qpid::framing::connection;//for connection error codes
 
 ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const 
ConnectionSettings& settings)
     : Bounds(settings.maxFrameSize * settings.bounds),
       handler(settings, v),
-      failover(new FailoverListener()),
       version(v),
       nextChannel(1)
 {
@@ -51,7 +52,6 @@
     handler.out = boost::bind(&Connector::send, boost::ref(connector), _1);
     handler.onClose = boost::bind(&ConnectionImpl::closed, this,
                                   CLOSE_CODE_NORMAL, std::string());
-
     //only set error handler once  open
     handler.onError = boost::bind(&ConnectionImpl::closed, this, _1, _2);
 }
@@ -69,7 +69,8 @@
     Mutex::ScopedLock l(lock);
     session->setChannel(channel ? channel : nextChannel++);
     boost::weak_ptr<SessionImpl>& s = sessions[session->getChannel()];
-    if (s.lock()) throw SessionBusyException();
+    boost::shared_ptr<SessionImpl> ss = s.lock();
+    if (ss) throw SessionBusyException(QPID_MSG("Channel " << ss->getChannel() 
<< " attachd to " << ss->getId()));
     s = session;
 }
 
@@ -110,7 +111,7 @@
     connector->init();
     handler.waitForOpen();
 
-    if (failover.get()) failover->start(shared_from_this());
+    failover.reset(new FailoverListener(shared_from_this(), 
handler.knownBrokersUrls));
 }
 
 void ConnectionImpl::idleIn()
@@ -176,7 +177,6 @@
 }
     
 std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() {
-    // FIXME aconway 2008-10-08: ensure we never return empty list, always 
include self Url.
     return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls;
 }
 
@@ -187,4 +187,6 @@
     return simpl;
 }
 
-void ConnectionImpl::stopFailoverListener() { failover.reset(); }
+void ConnectionImpl::stopFailoverListener() { failover->stop(); }
+
+}} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Thu Oct 16 
10:07:26 2008
@@ -89,6 +89,8 @@
     std::vector<Url> getKnownBrokers();
     void registerFailureCallback ( boost::function<void ()> fn ) { 
failureCallback = fn; }
     void stopFailoverListener();
+
+    framing::ProtocolVersion getVersion() { return version; }
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Thu Oct 16 
10:07:26 2008
@@ -72,7 +72,7 @@
     boost::state_saver<bool>  reset(running); // Reset to false on exit.
     running = true;
     try {
-        while (!queue->isClosed()) {
+        while (true) {
             Mutex::ScopedUnlock u(lock);
             FrameSet::shared_ptr content = queue->pop();
             if (content->isA<MessageTransferBody>()) {
@@ -92,12 +92,14 @@
                 }
             }
         }
-        session.sync(); // Make sure all our acks are received before 
returning.
     }
-    catch (const ClosedException& e) 
-    { 
-        QPID_LOG(debug, "Ignored exception in client dispatch thread: " << 
e.what());
-    } //ignore it and return
+    catch (const ClosedException& e)  { 
+        QPID_LOG(debug, "Dispatch thread exiting, session closed: " << 
session.getId());
+        try {
+            session.sync(); // Make sure all our acks are received before 
returning.
+        }
+        catch(...) {}
+    }
     catch (const std::exception& e) {
         QPID_LOG(error, "Exception in client dispatch thread: " << e.what());
         if ( failoverHandler )

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Thu Oct 
16 10:07:26 2008
@@ -21,6 +21,7 @@
 #include "FailoverListener.h"
 #include "SessionBase_0_10Access.h"
 #include "qpid/client/SubscriptionManager.h"
+#include "qpid/framing/Uuid.h"
 #include "qpid/log/Statement.h"
 #include "qpid/log/Helpers.h"
 
@@ -40,40 +41,58 @@
     return s;
 }
 
-FailoverListener::FailoverListener() {}
+FailoverListener::FailoverListener(const boost::shared_ptr<ConnectionImpl>& c, 
const std::vector<Url>& initUrls)
+  : knownBrokers(initUrls) 
+ {
+    // Special versions used to mark cluster catch-up connections
+    // which do not need a FailoverListener
+    if (c->getVersion().getMajor() >= 0x80)  {
+        QPID_LOG(debug, "No failover listener for catch-up connection.");
+        return;
+    }
 
-void FailoverListener::start(const boost::shared_ptr<ConnectionImpl>& c) {
-    Session session = makeSession(c->newSession(std::string(), 0));
+    Session session = 
makeSession(c->newSession(AMQ_FAILOVER+framing::Uuid(true).str(), 0));
     if (session.exchangeQuery(arg::name=AMQ_FAILOVER).getNotFound()) {
         session.close();
         return;
     }
     subscriptions.reset(new SubscriptionManager(session));
-    std::string qname=AMQ_FAILOVER + "." + session.getId().getName();
+    std::string qname=session.getId().getName();
     session.queueDeclare(arg::queue=qname, arg::exclusive=true, 
arg::autoDelete=true);
     session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
     subscriptions->subscribe(*this, qname, FlowControl::unlimited());
     thread = sys::Thread(*subscriptions);
 }
 
-void FailoverListener::stop() {
-    if (subscriptions.get()) subscriptions->stop();
-    if (thread.id()) thread.join();
-    if (subscriptions.get()) subscriptions->getSession().close();
-    thread=sys::Thread();
-    subscriptions.reset();
-}    
 FailoverListener::~FailoverListener() {
     try { stop(); }
     catch (const std::exception& e) {}
 }
 
+void FailoverListener::stop() {
+    if (subscriptions.get()) 
+        subscriptions->stop();
+
+    if (thread.id() == sys::Thread::current().id()) {
+        // FIXME aconway 2008-10-16: this can happen if ConnectionImpl
+        // dtor runs when my session drops its weak pointer lock.
+        // For now, leak subscriptions to prevent a core if we delete
+        // without joining.
+        subscriptions.release();
+    }
+    else if (thread.id()) {
+        thread.join();
+        thread=sys::Thread();
+        subscriptions.reset();  // Safe to delete after join.
+    }
+}
+
 void FailoverListener::received(Message& msg) {
     sys::Mutex::ScopedLock l(lock);
     knownBrokers.clear();
     framing::Array urlArray;
     msg.getHeaders().getArray("amq.failover", urlArray);
-    for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i < 
urlArray.end(); ++i ) 
+    for (framing::Array::ValueVector::const_iterator i = urlArray.begin(); i 
!= urlArray.end(); ++i ) 
         knownBrokers.push_back(Url((*i)->get<std::string>()));
     QPID_LOG(info, "Known-brokers update: " << log::formatList(knownBrokers));
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h Thu Oct 16 
10:07:26 2008
@@ -38,11 +38,10 @@
  */
 class FailoverListener : public MessageListener {
   public:
-    FailoverListener();
+    FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const 
std::vector<Url>& initUrls);
     ~FailoverListener();
-    void start(const boost::shared_ptr<ConnectionImpl>&);
     void stop();
-    
+
     std::vector<Url> getKnownBrokers() const;
     void received(Message& msg);
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Oct 16 
10:07:26 2008
@@ -30,6 +30,7 @@
 #include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterDumpRequestBody.h"
 #include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterConfigChangeBody.h"
 #include "qpid/framing/ClusterDumpOfferBody.h"
 #include "qpid/framing/ClusterDumpStartBody.h"
 #include "qpid/framing/ClusterShutdownBody.h"
@@ -76,6 +77,7 @@
 
     void dumpRequest(const std::string& url) { cluster.dumpRequest(member, 
url, l); }
     void ready(const std::string& url) { cluster.ready(member, url, l); }
+    void configChange(const std::string& addresses) { 
cluster.configChange(member, addresses, l); }
     void dumpOffer(uint64_t dumpee) { cluster.dumpOffer(member, dumpee, l); }
     void dumpStart(uint64_t dumpee, const std::string& url) { 
cluster.dumpStart(member, dumpee, url, l); }
     void shutdown() { cluster.shutdown(member, l); }
@@ -89,14 +91,14 @@
     cpg(*this),
     name(name_),
     myUrl(url_),
-    memberId(cpg.self()),
+    myId(cpg.self()),
     cpgDispatchHandle(
         cpg,
         boost::bind(&Cluster::dispatch, this, _1), // read
         0,                                         // write
         boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
-    deliverQueue(boost::bind(&Cluster::process, this, _1), poller),
+    deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
     mcastId(0),
     mgmtObject(0),
     state(INIT),
@@ -115,20 +117,20 @@
     failoverExchange.reset(new FailoverExchange(this));
     broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
     cpgDispatchHandle.startWatch(poller);
+    deliverQueue.start();
     cpg.join(name);
-    QPID_LOG(notice, *this << " joining cluster " << name.str());
+    QPID_LOG(notice, *this << " will join cluster " << name.str());
 }
 
 Cluster::~Cluster() {
     if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
 }
 
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
+bool Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
     Lock l(lock);
-    // FIXME aconway 2008-10-08: what keeps catchUp connections in memory if 
not in map?
-    // esp shadow connections? See race comment in getConnection.
-    assert(!c->isCatchUp());
-    connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+    bool result = 
connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c)).second;
+    assert(result);
+    return result;
 }
 
 void Cluster::erase(ConnectionId id) {
@@ -136,14 +138,19 @@
     connections.erase(id);
 }
 
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
+void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& 
id, uint32_t seq) {
     Lock l(lock);
-    mcastControl(body, cptr, l);
+    mcastControl(body, id, seq, l);
 }
 
-void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr, 
Lock&) {
-    Lock l(lock);
-    Event e(Event::control(body, ConnectionId(memberId, cptr), ++mcastId));
+void Cluster::mcastControl(const framing::AMQBody& body, const ConnectionId& 
id, uint32_t seq, Lock& l) {
+    Event e(Event::control(body, id, seq));
+    QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
+    mcast(e, l);
+}
+
+void Cluster::mcastControl(const framing::AMQBody& body, Lock& l) {
+    Event e(Event::control(body, ConnectionId(myId,0), ++mcastId));
     QPID_LOG(trace, *this << " MCAST " << e << ": " << body);
     mcast(e, l);
 }
@@ -166,8 +173,8 @@
 void Cluster::mcast(const Event& e, Lock&) {
     if (state == LEFT) 
         return;
-    if (state < READY && e.isConnection()) {
-        // Stall outgoing connection events.
+    if (state <= CATCHUP && e.isConnection()) {
+        // Stall outgoing connection events untill we are fully READY
         QPID_LOG(trace, *this << " MCAST deferred: " << e );
         mcastQueue.push_back(e); 
     }
@@ -192,10 +199,10 @@
 void Cluster::leave(Lock&) { 
     if (state != LEFT) {
         state = LEFT;
+        if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
         QPID_LOG(notice, *this << " leaving cluster " << name.str());
 
         if (!deliverQueue.isStopped()) deliverQueue.stop();
-        if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
         try { cpg.leave(name); }
         catch (const std::exception& e) {
             QPID_LOG(critical, *this << " error leaving process group: " << 
e.what());
@@ -211,14 +218,15 @@
 boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& 
connectionId, Lock&)  {
     ConnectionMap::iterator i = connections.find(connectionId);
     if (i == connections.end()) { 
-        if (connectionId.getMember() == memberId) { // Closed local connection
+        if (connectionId.getMember() == myId) { // Closed local connection
             QPID_LOG(warning, *this << " attempt to use closed connection " << 
connectionId);
             return boost::intrusive_ptr<Connection>();
         }
         else {                  // New shadow connection
             std::ostringstream mgmtId;
             mgmtId << name.str() << ":"  << connectionId;
-            ConnectionMap::value_type value(connectionId, new 
Connection(*this, shadowOut, mgmtId.str(), connectionId));
+            ConnectionMap::value_type value(connectionId,
+                                            new Connection(*this, shadowOut, 
mgmtId.str(), connectionId));
             i = connections.insert(value).first;
         }
     }
@@ -242,50 +250,54 @@
 {
     Mutex::ScopedLock l(lock);
     MemberId from(nodeid, pid);
-    Event e = Event::delivered(from, msg, msg_len);
+    deliver(Event::delivered(from, msg, msg_len), l);
+}
+
+void Cluster::deliver(const Event& e, Lock&) {
     if (state == LEFT) return;
-    QPID_LOG(trace, *this << " DLVR: " << e);
-    if (e.isCluster() && state != DUMPEE) // Process cluster controls 
immediately unless in DUMPEE state.
-        process(e, l);
-    else if (state != NEWBIE) // Newbie discards events up to the dump offer.
-        deliverQueue.push(e);
+    QPID_LOG(trace, *this << " PUSH: " << e);
+    deliverQueue.push(e);       // Otherwise enqueue for processing.
 }
 
-void Cluster::process(const Event& e) {
+void Cluster::delivered(const Event& e) {
     Lock l(lock);
-    process(e,l);
+    delivered(e,l);
 }
 
-void Cluster::process(const Event& e, Lock& l) {
+void Cluster::delivered(const Event& e, Lock& l) {
     try {
         Buffer buf(e);
         AMQFrame frame;
         if (e.isCluster())  {
             while (frame.decode(buf)) {
-                QPID_LOG(trace, *this << " PROC: " << e << " " << frame);
+                QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
                 ClusterDispatcher dispatch(*this, e.getMemberId(), l);
                 if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
                     throw Exception(QPID_MSG("Invalid cluster control"));
             }
         }
         else {                      // e.isConnection()
-            boost::intrusive_ptr<Connection> connection = 
getConnection(e.getConnectionId(), l);
-            if (connection) {   // Ignore if no connection.
-                if (e.getType() == DATA) {
-                    QPID_LOG(trace, *this << " PROC: " << e);
-                    connection->deliverBuffer(buf);
-                }
-                else {              // control
+            if (state == NEWBIE) {
+                QPID_LOG(trace, *this << " DROP: " << e);
+            }
+            else {
+                boost::intrusive_ptr<Connection> connection = 
getConnection(e.getConnectionId(), l);
+                if (!connection) return;
+                if (e.getType() == CONTROL) {              
                     while (frame.decode(buf)) {
-                        QPID_LOG(trace, *this << " PROC: " << e << " " << 
frame);
+                        QPID_LOG(trace, *this << " DLVR: " << e << " " << 
frame);
                         connection->delivered(frame);
                     }
                 }
+                else  {
+                    QPID_LOG(trace, *this << " DLVR: " << e);
+                    connection->deliverBuffer(buf);
+                }
             }
         }
     }
     catch (const std::exception& e) {
-        QPID_LOG(critical, *this << " error in cluster process: " << e.what());
+        QPID_LOG(critical, *this << " error in cluster delivered: " << 
e.what());
         leave(l);
     }
 }
@@ -304,11 +316,11 @@
     for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
         const char* reasonString;
         switch (p->reason) {
-          case CPG_REASON_JOIN: reasonString =  " joined "; break;
-          case CPG_REASON_LEAVE: reasonString =  " left "; break;
-          case CPG_REASON_NODEDOWN: reasonString =  " node-down "; break;
-          case CPG_REASON_NODEUP: reasonString =  " node-up "; break;
-          case CPG_REASON_PROCDOWN: reasonString =  " process-down "; break;
+          case CPG_REASON_JOIN: reasonString =  " (joined) "; break;
+          case CPG_REASON_LEAVE: reasonString =  " (left) "; break;
+          case CPG_REASON_NODEDOWN: reasonString =  " (node-down) "; break;
+          case CPG_REASON_NODEUP: reasonString =  " (node-up) "; break;
+          case CPG_REASON_PROCDOWN: reasonString =  " (process-down) "; break;
           default: reasonString = " ";
         }
         qpid::cluster::MemberId member(*p);
@@ -338,61 +350,52 @@
     cpg_name */*group*/,
     cpg_address *current, int nCurrent,
     cpg_address *left, int nLeft,
-    cpg_address *joined, int nJoined)
+    cpg_address */*joined*/, int /*nJoined*/)
 {
     Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, *this << " configuration change: " << AddrList(current, 
nCurrent) 
+    QPID_LOG(debug, *this << " enqueue config change: " << AddrList(current, 
nCurrent) 
              << AddrList(left, nLeft, "( ", ")"));
-    map.configChange(current, nCurrent, left, nLeft, joined, nJoined);
+    std::string addresses;
+    for (cpg_address* p = current; p < current+nCurrent; ++p) 
+        addresses.append(MemberId(*p).str());
+    deliver(Event::control(ClusterConfigChangeBody(ProtocolVersion(), 
addresses), myId), l);
+}
+
+void Cluster::configChange(const MemberId&, const std::string& addresses, 
Lock& l) {
+    bool memberChange = map.configChange(addresses);
     if (state == LEFT) return;
-    if (!map.isAlive(memberId)) { leave(l); return; } 
     
-    if(state == INIT) {    // First configChange
-        if (map.aliveCount() == 1) { 
+    if (!map.isAlive(myId)) {  // Final config change.
+        leave(l);
+        return;
+    }
+
+    if (state == INIT) {        // First configChange
+        if (map.aliveCount() == 1) {
             QPID_LOG(info, *this << " first in cluster at " << myUrl);
-            map = ClusterMap(memberId, myUrl, true);
+            state = READY;
+            if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+            map = ClusterMap(myId, myUrl, true);
             memberUpdate(l);
-            unstall(l);
         }
         else {                  // Joining established group.
             state = NEWBIE;
-            mcastControl(ClusterDumpRequestBody(ProtocolVersion(), 
myUrl.str()), 0, l);
+            mcastControl(ClusterDumpRequestBody(ProtocolVersion(), 
myUrl.str()), l);
             QPID_LOG(debug, *this << " send dump-request " << myUrl);
         }
     }
-    else if (state >= READY)
+    else if (state >= READY && memberChange)
         memberUpdate(l);
 }
 
-void Cluster::dumpInDone(const ClusterMap& m) {
-    Lock l(lock);
-    dumpedMap = m;
-    checkDumpIn(l);
-}
+
+
 
 void Cluster::tryMakeOffer(const MemberId& id, Lock& l) {
     if (state == READY && map.isNewbie(id)) {
         state = OFFER;
         QPID_LOG(debug, *this << " send dump-offer to " << id);
-        mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), 0, l);
-    }
-}
-
-void Cluster::unstall(Lock& l) {
-    // Called with lock held
-    switch (state) {
-      case INIT: case DUMPEE: case DUMPER: case READY:
-        QPID_LOG(debug, *this << " unstall: deliver=" << deliverQueue.size()
-                 << " mcast=" << mcastQueue.size());
-        deliverQueue.start();
-        state = READY;
-        for_each(mcastQueue.begin(), mcastQueue.end(), 
boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
-        mcastQueue.clear();
-        if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
-        break;
-      case LEFT: break;
-      case NEWBIE: case OFFER:
-        assert(0);
+        mcastControl(ClusterDumpOfferBody(ProtocolVersion(), id), l);
     }
 }
 
@@ -418,23 +421,25 @@
 }
 
 void Cluster::ready(const MemberId& id, const std::string& url, Lock& l) {
-    map.ready(id, Url(url));
-    if (id == memberId)
-        unstall(l);
-    memberUpdate(l);
+    if (map.ready(id, Url(url))) 
+        memberUpdate(l);
+    if (state == CATCHUP && id == myId) {
+        QPID_LOG(debug, *this << " caught-up, going to ready mode.");
+        state = READY;
+        if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
+        for_each(mcastQueue.begin(), mcastQueue.end(), 
boost::bind(&Cluster::mcast, this, _1, boost::ref(l)));
+        mcastQueue.clear();
+    }
 }
 
 void Cluster::dumpOffer(const MemberId& dumper, uint64_t dumpeeInt, Lock& l) {
     if (state == LEFT) return;
     MemberId dumpee(dumpeeInt);
     boost::optional<Url> url = map.dumpOffer(dumper, dumpee);
-    if (dumper == memberId) {
+    if (dumper == myId) {
         assert(state == OFFER);
         if (url) {              // My offer was first.
-            QPID_LOG(debug, *this << " mark dump point for dump to " << 
dumpee);
-            // Put dump-start on my own deliver queue to mark the stall point.
-            // We will stall when it is processed.
-            
deliverQueue.push(Event::control(ClusterDumpStartBody(ProtocolVersion(), 
dumpee, url->str()), memberId));
+            dumpStart(myId, dumpee, url->str(), l);
         }
         else {                  // Another offer was first.
             QPID_LOG(debug, *this << " cancel dump offer to " << dumpee);
@@ -442,38 +447,47 @@
             tryMakeOffer(map.firstNewbie(), l); // Maybe make another offer.
         }
     }
-    else if (dumpee == memberId && url) {
+    else if (dumpee == myId && url) {
         assert(state == NEWBIE);
         QPID_LOG(debug, *this << " accepted dump-offer from " << dumper);
         state = DUMPEE;
+        deliverQueue.stop();
         checkDumpIn(l);
     }
 }
 
+// FIXME aconway 2008-10-15: no longer need a separate control now
+// that the dump control is in the deliver queue.
 void Cluster::dumpStart(const MemberId& , uint64_t dumpeeInt, const 
std::string& urlStr, Lock& l) {
     if (state == LEFT) return;
     MemberId dumpee(dumpeeInt);
     Url url(urlStr);
     assert(state == OFFER);
+    state = DUMPER;
     deliverQueue.stop();
     QPID_LOG(debug, *this << " stall and dump to " << dumpee << " at " << 
urlStr);
-    state = DUMPER;
     if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
     dumpThread = Thread(
-        new DumpClient(memberId, dumpee, url, broker, map, getConnections(l), 
+        new DumpClient(myId, dumpee, url, broker, map, getConnections(l), 
                        boost::bind(&Cluster::dumpOutDone, this),
                        boost::bind(&Cluster::dumpOutError, this, _1)));
 }
 
+void Cluster::dumpInDone(const ClusterMap& m) {
+    Lock l(lock);
+    dumpedMap = m;
+    checkDumpIn(l);
+}
+
 void Cluster::checkDumpIn(Lock& l) {
     if (state == LEFT) return;
-    assert(state == DUMPEE || state == NEWBIE);
     if (state == DUMPEE && dumpedMap) {
         map = *dumpedMap;
-        QPID_LOG(debug, *this << " incoming dump complete. Members: " << map);
-        mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), 0, l);
-        state = READY;
-        // unstall when ready control is self-delivered.
+        QPID_LOG(debug, *this << " incoming dump complete, start catchup. 
map=" << map);
+        mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), l);
+        // Don't flush the mcast queue till we are READY, on self-deliver.
+        state = CATCHUP;
+        deliverQueue.start();
     }
 }
 
@@ -485,7 +499,8 @@
 void Cluster::dumpOutDone(Lock& l) {
     QPID_LOG(debug, *this  << " finished sending dump.");
     assert(state == DUMPER);
-    unstall(l);
+    state = READY;
+    deliverQueue.start();
     tryMakeOffer(map.firstNewbie(), l); // Try another offer
 }
 
@@ -504,7 +519,7 @@
 
 Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, 
string&) {
     Lock l(lock);
-    QPID_LOG (debug, *this << " managementMethod [id=" << methodId << "]");
+    QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
     switch (methodId) {
       case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
       case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
@@ -520,10 +535,11 @@
 
 void Cluster::stopFullCluster(Lock& l) {
     QPID_LOG(notice, *this << " shutting down cluster " << name.str());
-    mcastControl(ClusterShutdownBody(), 0, l);
+    mcastControl(ClusterShutdownBody(), l);
 }
 
 void Cluster::memberUpdate(Lock& l) {
+    QPID_LOG(debug, *this << " member update, map=" << map);
     std::vector<Url> vectUrl = getUrls(l);
     size_t size = vectUrl.size();
 
@@ -552,12 +568,12 @@
 }
 
 std::ostream& operator<<(std::ostream& o, const Cluster& cluster) {
-    static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "READY", 
"OFFER", "DUMPER", "LEFT" };
-    return o << cluster.memberId << "(" << STATE[cluster.state] << ")";
+    static const char* STATE[] = { "INIT", "NEWBIE", "DUMPEE", "CATCHUP", 
"READY", "OFFER", "DUMPER", "LEFT" };
+    return o << cluster.myId << "(" << STATE[cluster.state] << ")";
 }
 
 MemberId Cluster::getId() const {
-    return memberId;            // Immutable, no need to lock.
+    return myId;            // Immutable, no need to lock.
 }
 
 broker::Broker& Cluster::getBroker() const {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Oct 16 
10:07:26 2008
@@ -65,11 +65,11 @@
     virtual ~Cluster();
 
     // Connection map
-    void insert(const ConnectionPtr&); 
+    bool insert(const ConnectionPtr&); 
     void erase(ConnectionId);       
     
     // Send to the cluster 
-    void mcastControl(const framing::AMQBody& controlBody, Connection* cptr=0);
+    void mcastControl(const framing::AMQBody& controlBody, const 
ConnectionId&, uint32_t id);
     void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id);
     void mcast(const Event& e);
 
@@ -101,7 +101,8 @@
     // a Lock to call the unlocked functions.
 
     // Unlocked versions of public functions
-    void mcastControl(const framing::AMQBody& controlBody, Connection* cptr, 
Lock&);
+    void mcastControl(const framing::AMQBody& controlBody, const 
ConnectionId&, uint32_t, Lock&);
+    void mcastControl(const framing::AMQBody& controlBody, Lock&);
     void mcastBuffer(const char*, size_t, const ConnectionId&, uint32_t id, 
Lock&);
     void mcast(const Event& e, Lock&);
     void leave(Lock&);
@@ -110,9 +111,6 @@
     // Called via CPG, deliverQueue or DumpClient threads. 
     void tryMakeOffer(const MemberId&, Lock&);
 
-    // Called in CPG, connection IO and DumpClient threads.
-    void unstall(Lock&);
-
     // Called in main thread in ~Broker.
     void brokerShutdown();
 
@@ -123,9 +121,10 @@
     void dumpOffer(const MemberId& dumper, uint64_t dumpee, Lock&);
     void dumpStart(const MemberId& dumper, uint64_t dumpeeInt, const 
std::string& urlStr, Lock&);
     void ready(const MemberId&, const std::string&, Lock&);
+    void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void shutdown(const MemberId&, Lock&);
-    void process(const Event&); // deliverQueue callback
-    void process(const Event&, Lock&); // unlocked version
+    void delivered(const Event&); // deliverQueue callback
+    void delivered(const Event&, Lock&); // unlocked version
 
     // CPG callbacks, called in CPG IO thread.
     void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
@@ -139,6 +138,8 @@
         void* /*msg*/,
         int /*msg_len*/);
 
+    void deliver(const Event& e, Lock&); 
+    
     void configChange( // CPG config change callback.
         cpg_handle_t /*handle*/,
         struct cpg_name */*group*/,
@@ -172,7 +173,7 @@
     Cpg cpg;
     const Cpg::Name name;
     const Url myUrl;
-    const MemberId memberId;
+    const MemberId myId;
 
     ConnectionMap connections;
     NoOpConnectionOutputHandler shadowOut;
@@ -183,7 +184,17 @@
 
     qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns 
lifecycle
 
-    enum { INIT, NEWBIE, DUMPEE, READY, OFFER, DUMPER, LEFT } state;
+    enum {
+        INIT,                   ///< Initial state, no CPG messages received.
+        NEWBIE,                 ///< Sent dump request, waiting for dump offer.
+        DUMPEE,                 ///< Stalled receive queue at dump offer, 
waiting for dump to complete.
+        CATCHUP,                ///< Dump complete, unstalled but has not yet 
seen own "ready" event.
+        READY,                  ///< Fully operational 
+        OFFER,                  ///< Sent an offer, waiting for accept/reject.
+        DUMPER,                 ///< Offer accepted, sending a state dump.
+        LEFT                    ///< Final state, left the cluster.
+    } state;
+    
     ClusterMap map;
     sys::Thread dumpThread;
     boost::optional<ClusterMap> dumpedMap;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Thu Oct 16 
10:07:26 2008
@@ -34,25 +34,24 @@
 namespace cluster {
 
 namespace {
-void insertSet(ClusterMap::Set& set, const ClusterMap::Map::value_type& v) { 
set.insert(v.first); }
 
-void insertMap(ClusterMap::Map& map, FieldTable::ValueMap::value_type vt) {
-    map.insert(ClusterMap::Map::value_type(vt.first, 
Url(vt.second->get<std::string>())));
+void addFieldTableValue(FieldTable::ValueMap::value_type vt, ClusterMap::Map& 
map, ClusterMap::Set& set) {
+    MemberId id(vt.first);
+    set.insert(id);
+    std::string url = vt.second->get<std::string>();
+    if (!url.empty())
+        map.insert(ClusterMap::Map::value_type(id, Url(url)));
 }
 
-void assignMap(ClusterMap::Map& map, const FieldTable& ft) {
-    map.clear();
-    std::for_each(ft.begin(), ft.end(), boost::bind(&insertMap, 
boost::ref(map), _1));
-}
-
-void insertFieldTable(FieldTable& ft, const ClusterMap::Map::value_type& vt) {
-    return ft.setString(vt.first.str(), vt.second.str());
+void insertFieldTableFromMapValue(FieldTable& ft, const 
ClusterMap::Map::value_type& vt) {
+    ft.setString(vt.first.str(), vt.second.str());
 }
 
 void assignFieldTable(FieldTable& ft, const ClusterMap::Map& map) {
     ft.clear();
-    std::for_each(map.begin(), map.end(), boost::bind(&insertFieldTable, 
boost::ref(ft), _1));
+    std::for_each(map.begin(), map.end(), 
boost::bind(&insertFieldTableFromMapValue, boost::ref(ft), _1));
 }
+
 }
 
 ClusterMap::ClusterMap() {}
@@ -66,10 +65,21 @@
 }
 
 ClusterMap::ClusterMap(const FieldTable& newbiesFt, const FieldTable& 
membersFt) {
-    assignMap(newbies, newbiesFt);
-    assignMap(members, membersFt);
-    std::for_each(newbies.begin(), newbies.end(), boost::bind(&insertSet, 
boost::ref(alive), _1));
-    std::for_each(members.begin(), members.end(), boost::bind(&insertSet, 
boost::ref(alive), _1));
+    std::for_each(newbiesFt.begin(), newbiesFt.end(), 
boost::bind(&addFieldTableValue, _1, boost::ref(newbies), boost::ref(alive)));
+    std::for_each(membersFt.begin(), membersFt.end(), 
boost::bind(&addFieldTableValue, _1, boost::ref(members), boost::ref(alive)));
+}
+
+ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
+    framing::ClusterConnectionMembershipBody b;
+    b.getNewbies().clear();
+    std::for_each(newbies.begin(), newbies.end(), 
boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getNewbies()), _1));
+    for(Set::const_iterator i = alive.begin(); i != alive.end(); ++i) {
+        if (!isMember(*i) && !isNewbie(*i))
+            b.getNewbies().setString(i->str(), std::string());
+    }
+    b.getMembers().clear();
+    std::for_each(members.begin(), members.end(), 
boost::bind(&insertFieldTableFromMapValue, boost::ref(b.getMembers()), _1));
+    return b;
 }
 
 bool ClusterMap::configChange(
@@ -80,7 +90,7 @@
     cpg_address* a;
     bool memberChange=false;
     for (a = left; a != left+nLeft; ++a) {
-        memberChange = members.erase(*a);
+        memberChange = memberChange || members.erase(*a);
         newbies.erase(*a);
     }
     alive.clear();
@@ -97,13 +107,6 @@
     return newbies.empty() ? MemberId() : newbies.begin()->first;
 }
 
-ClusterConnectionMembershipBody ClusterMap::asMethodBody() const {
-    framing::ClusterConnectionMembershipBody b;
-    assignFieldTable(b.getNewbies(), newbies);
-    assignFieldTable(b.getMembers(), members);
-    return b;
-}
-
 std::vector<Url> ClusterMap::memberUrls() const {
     std::vector<Url> urls(members.size());
     std::transform(members.begin(), members.end(), urls.begin(),
@@ -121,7 +124,8 @@
     for (ClusterMap::Set::const_iterator i = m.alive.begin(); i != 
m.alive.end(); ++i) {
         o << *i;
         if (m.isMember(*i)) o << "(member)";
-        if (m.isNewbie(*i)) o << "(newbie)";
+        else if (m.isNewbie(*i)) o << "(newbie)";
+        else o << "(unknown)";
         o << " ";
     }
     return o;
@@ -139,6 +143,23 @@
     return isAlive(id) &&  members.insert(Map::value_type(id,url)).second;
 }
 
+bool ClusterMap::configChange(const std::string& addresses) {
+    bool memberChange = false;
+    Set update;
+    for (std::string::const_iterator i = addresses.begin(); i < 
addresses.end(); i += 8)  
+        update.insert(MemberId(std::string(i, i+8)));
+    Set removed;
+    std::set_difference(alive.begin(), alive.end(),
+                        update.begin(), update.end(),
+                        std::inserter(removed, removed.begin()));
+    alive = update;
+    for (Set::const_iterator i = removed.begin(); i != removed.end(); ++i) {
+        memberChange = memberChange || members.erase(*i);
+        newbies.erase(*i);
+    }
+    return memberChange;
+}
+
 boost::optional<Url> ClusterMap::dumpOffer(const MemberId& from, const 
MemberId& to) {
     Map::iterator i = newbies.find(to);
     if (isAlive(from) && i != newbies.end()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Thu Oct 16 
10:07:26 2008
@@ -58,6 +58,8 @@
         cpg_address *left, int nLeft,
         cpg_address *joined, int nJoined);
 
+    bool configChange(const std::string& addresses);
+
     bool isNewbie(const MemberId& id) const { return newbies.find(id) != 
newbies.end(); }
     bool isMember(const MemberId& id) const { return members.find(id) != 
members.end(); }
     bool isAlive(const MemberId& id) const { return alive.find(id) != 
alive.end(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Oct 16 
10:07:26 2008
@@ -44,7 +44,7 @@
     : cluster(c), self(myId), catchUp(false), output(*this, out),
       connection(&output, cluster.getBroker(), wrappedId)
 {
-    QPID_LOG(debug, "New connection: " << *this);
+    QPID_LOG(debug, cluster << " new connection: " << *this);
 }
 
 // Local connections
@@ -53,11 +53,11 @@
     : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
       connection(&output, cluster.getBroker(), wrappedId)
 {
-    QPID_LOG(debug, "New connection: " << *this);
+    QPID_LOG(debug, cluster << " new connection: " << *this);
 }
 
 Connection::~Connection() {
-    QPID_LOG(debug, "Deleted connection: " << *this);
+    QPID_LOG(debug, cluster << " deleted connection: " << *this);
 }
 
 bool Connection::doOutput() {
@@ -72,32 +72,36 @@
     output.deliverDoOutput(requested);
 }
 
+// FIXME aconway 2008-10-15:  changes here, dubious.
+
 // Received from a directly connected client.
 void Connection::received(framing::AMQFrame& f) {
-    QPID_LOG(trace, "RECV " << *this << ": " << f);
-    if (isShadow()) {           
-        // Intercept the close that completes catch-up for shadow a connection.
-        if (isShadow() && f.getMethod() && 
f.getMethod()->isA<ConnectionCloseBody>()) { 
-            catchUp = false;
-            cluster.insert(boost::intrusive_ptr<Connection>(this));
+    QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
+    if (isLocal()) {
+        currentChannel = f.getChannel();
+        if (!framing::invoke(*this, *f.getBody()).wasHandled())
+            connection.received(f);
+    }
+    else {             // Shadow or dumped ex catch-up connection.
+        if (f.getMethod() && f.getMethod()->isA<ConnectionCloseBody>()) {
+            if (isShadow()) {
+                QPID_LOG(debug, cluster << " inserting connection " << *this);
+                cluster.insert(boost::intrusive_ptr<Connection>(this));
+            }
             AMQFrame ok(in_place<ConnectionCloseOkBody>());
             connection.getOutput().send(ok);
             output.setOutputHandler(discardHandler);
+            catchUp = false;
         }
         else
-            QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
-    }
-    else {
-        currentChannel = f.getChannel();
-        if (!framing::invoke(*this, *f.getBody()).wasHandled())
-            connection.received(f);
+            QPID_LOG(warning, cluster << " ignoring unexpected frame " << 
*this << ": " << f);
     }
 }
 
 // Delivered from cluster.
 void Connection::delivered(framing::AMQFrame& f) {
-    QPID_LOG(trace, "DLVR " << *this << ": " << f);
-    assert(!isCatchUp());
+    QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+    assert(!catchUp);
     // Handle connection controls, deliver other frames to connection.
     currentChannel = f.getChannel();
     if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -106,24 +110,25 @@
 
 void Connection::closed() {
     try {
-        QPID_LOG(debug, "Connection closed " << *this);
         if (catchUp) {
-            QPID_LOG(critical, cluster << " error on catch-up connection " << 
*this);
+            QPID_LOG(critical, cluster << " catch-up connection closed 
prematurely " << *this);
             cluster.leave();
         }
-        else if (isDump()) 
+        else if (isDumped()) {
+            QPID_LOG(debug, cluster << " closed dump connection " << *this);
             connection.closed();
+        }
         else if (isLocal()) {
+            QPID_LOG(debug, cluster << " local close of replicated connection 
" << *this);
             // This was a local replicated connection. Multicast a deliver
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
             output.setOutputHandler(discardHandler);
-            cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
-            ++mcastSeq;
+            cluster.mcastControl(ClusterConnectionDeliverCloseBody(), self, 
++mcastSeq);
         }
     }
     catch (const std::exception& e) {
-        QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
+        QPID_LOG(error, cluster << " error closing connection " << *this << ": 
" << e.what());
     }
 }
 
@@ -135,7 +140,7 @@
 
 // Decode data from local clients.
 size_t Connection::decode(const char* buffer, size_t size) {
-    if (catchUp || isDump()) {              // Handle catch-up locally.
+    if (catchUp) {  // Handle catch-up locally.
         Buffer buf(const_cast<char*>(buffer), size);
         while (localDecoder.decode(buf))
             received(localDecoder.frame);
@@ -174,26 +179,39 @@
         received,
         unknownCompleted,
         receivedIncomplete);
-    QPID_LOG(debug, "Received session state dump for " << s->getId());
+    QPID_LOG(debug, cluster << " received session state dump for " << 
s->getId());
 }
     
 void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
     ConnectionId shadow = ConnectionId(memberId, connectionId);
-    QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << 
shadow);
+    QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes 
shadow " << shadow);
     self = shadow;
 }
 
-void Connection::membership(const FieldTable& urls, const FieldTable& states) {
-    cluster.dumpInDone(ClusterMap(urls,states));
-    catchUp = false;
-    self.second = 0;            // Mark this as completed dump connection.
+void Connection::membership(const FieldTable& newbies, const FieldTable& 
members) {
+    QPID_LOG(debug, cluster << " incoming dump complete on connection " << 
*this);
+    cluster.dumpInDone(ClusterMap(newbies, members));
+    self.second = 0;        // Mark this as completed dump connection.
 }
 
-bool Connection::isLocal() const { return self.first == cluster.getId() && 
self.second == this; }
+bool Connection::isLocal() const {
+    return self.first == cluster.getId() && self.second == this;
+}
+
+bool Connection::isShadow() const {
+    return self.first != cluster.getId();
+}
+
+bool Connection::isDumped() const {
+    return self.first == cluster.getId() && self.second == 0;
+}
 
 std::ostream& operator<<(std::ostream& o, const Connection& c) {
-    return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")
-             << (c.isCatchUp() ? ",catchup" : "") << ")";
+    const char* type="unknown";
+    if (c.isLocal()) type = "local";
+    else if (c.isShadow()) type = "shadow";
+    else if (c.isDumped()) type = "dumped";
+    return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "") 
<< ")";
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Oct 16 
10:07:26 2008
@@ -64,12 +64,13 @@
     bool isLocal() const;
 
     /** True for connections that are shadowing remote broker connections */
-    bool isShadow() const { return !isLocal(); }
+    bool isShadow() const;
 
     /** True if the connection is in "catch-up" mode: building initial broker 
state. */
     bool isCatchUp() const { return catchUp; }
 
-    bool isDump() const { return self.getPointer() == 0; }
+    /** True if the connection is a completed shared dump connection */
+    bool isDumped() const;
 
     Cluster& getCluster() { return cluster; }
 
@@ -103,6 +104,7 @@
     void membership(const framing::FieldTable&, const framing::FieldTable&);
 
   private:
+    bool catcUp;
 
     void deliverClose();
     void deliverDoOutput(uint32_t requested);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Oct 16 
10:07:26 2008
@@ -168,9 +168,6 @@
     // authentication etc. See ConnectionSettings.
     shadowConnection.open(dumpeeUrl, bc.getUserId());
 
-    // Stop the failover listener as its session will conflict with 
re-creating-sessions
-    
client::ConnectionAccess::getImpl(shadowConnection)->stopFailoverListener();
-    
     
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession,
 this, _1));
     ClusterConnectionProxy(shadowConnection).shadowReady(
         dumpConnection->getId().getMember(),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu 
Oct 16 10:07:26 2008
@@ -105,8 +105,9 @@
     // Note we may send 0 size request if there's more than 2*estimate in the 
buffer.
     // Send it anyway to keep the doOutput chain going until we are sure 
there's no more output
     // (in deliverDoOutput)
-    // 
-    
parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(),
 request), &parent);
+    //
+    // FIXME aconway 2008-10-16: use ++parent.mcastSeq as sequence no,not 0
+    
parent.getCluster().mcastControl(ClusterConnectionDeliverDoOutputBody(ProtocolVersion(),
 request), parent.getId(), 0);
     QPID_LOG(trace, &parent << "Send doOutput request for " << request);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Oct 16 
10:07:26 2008
@@ -177,46 +177,52 @@
     return s;
 }
 
-template <class T>  std::set<uint16_t> knownBrokerPorts(T& source, size_t n) {
+template <class T>  std::set<uint16_t> knownBrokerPorts(T& source, int n=-1) {
     vector<Url> urls = source.getKnownBrokers();
-    for (size_t retry=1000; urls.size() != n && retry != 0; --retry) {
-        ::usleep(1000);
-        urls = source.getKnownBrokers();
+    BOOST_MESSAGE("knownBrokerPorts " << n << ": " << urls);
+    if (n >= 0) {
+        for (size_t retry=10; urls.size() != unsigned(n) && retry != 0; 
--retry) {
+            ::usleep(100000);
+            urls = source.getKnownBrokers();
+            BOOST_MESSAGE("knownBrokerPorts retry: " << urls);
+        }
     }
     set<uint16_t> s;
-    for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
-        BOOST_MESSAGE("Failover URL: " << *i);      
-        BOOST_CHECK(i->size() >= 1);
-        BOOST_CHECK((*i)[0].get<TcpAddress>());
+    for (vector<Url>::const_iterator i = urls.begin(); i != urls.end(); ++i) 
         s.insert((*i)[0].get<TcpAddress>()->port);
-    }
     return s;
 }
 
-QPID_AUTO_TEST_CASE(testFailoverListener) {
-    ClusterFixture cluster(2);
-    Client c0(cluster[0], "c0");
-    FailoverListener fl;
-    fl.start(ConnectionAccess::getImpl(c0.connection));
-    set<uint16_t> set0=makeSet(cluster);
-
-    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
-    cluster.add();
-    BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(fl, 3));
-    cluster.kill(2);
-    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(fl, 2));
-}
-
 QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
-    ClusterFixture cluster(2);
+    ClusterFixture cluster(1);
     Client c0(cluster[0], "c0");
-    set<uint16_t> set0=makeSet(cluster);
+    set<uint16_t> kb0 = knownBrokerPorts(c0.connection);
+    BOOST_CHECK_EQUAL(kb0.size(), 1);
+    BOOST_CHECK_EQUAL(kb0, makeSet(cluster));
+
+    cluster.add();
+    Client c1(cluster[1], "c1");
+    set<uint16_t> kb1 = knownBrokerPorts(c1.connection);
+    kb0 = knownBrokerPorts(c0.connection, 2);
+    BOOST_CHECK_EQUAL(kb1.size(), 2);
+    BOOST_CHECK_EQUAL(kb1, makeSet(cluster));
+    BOOST_CHECK_EQUAL(kb1,kb0);
 
-    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
     cluster.add();
-    BOOST_CHECK_EQUAL(makeSet(cluster), knownBrokerPorts(c0.connection, 3));
-    cluster.kill(2);
-    BOOST_CHECK_EQUAL(set0, knownBrokerPorts(c0.connection, 2));
+    Client c2(cluster[2], "c2");
+    set<uint16_t> kb2 = knownBrokerPorts(c2.connection);
+    kb1 = knownBrokerPorts(c1.connection, 3);
+    kb0 = knownBrokerPorts(c0.connection, 3);
+    BOOST_CHECK_EQUAL(kb2.size(), 3);
+    BOOST_CHECK_EQUAL(kb2, makeSet(cluster));
+    BOOST_CHECK_EQUAL(kb2,kb0);
+    BOOST_CHECK_EQUAL(kb2,kb1);
+
+    cluster.kill(1);
+    kb0 = knownBrokerPorts(c0.connection, 2);
+    kb2 = knownBrokerPorts(c2.connection, 2);
+    BOOST_CHECK_EQUAL(kb0.size(), 2);
+    BOOST_CHECK_EQUAL(kb0, kb2);
 }
 
 QPID_AUTO_TEST_CASE(DumpConsumers) {
@@ -238,6 +244,7 @@
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
     BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
 
+
     // Activate the subscription, ensure message removed on all queues. 
     c0.subs.setFlowControl("q", FlowControl::messageCredit(1));
     Message m;

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=705287&r1=705286&r2=705287&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Oct 16 10:07:26 2008
@@ -44,6 +44,10 @@
     <control name="ready" code="0x10" label="New member is ready.">
       <field name="url" type="str16"/>
     </control>
+
+    <control name="config-change" code="0x11" label="Raw cluster membership.">
+      <field name="current" type="vbin16"/> <!-- packed member-id array -->
+    </control>
     
     <control name="shutdown" code="0x20" label="Shut down entire cluster"/>
   </class>


Reply via email to