Author: aconway
Date: Thu Sep 18 13:18:29 2008
New Revision: 696788

URL: http://svn.apache.org/viewvc?rev=696788&view=rev
Log:
Dump shared state to new cluster members.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Sep 18 
13:18:29 2008
@@ -26,6 +26,7 @@
 #include "qpid/ptr_map.h"
 #include "qpid/framing/AMQP_ClientProxy.h"
 #include "qpid/agent/ManagementAgent.h"
+#include "qpid/framing/enum.h"
 
 #include <boost/bind.hpp>
 #include <boost/ptr_container/ptr_vector.hpp>
@@ -195,14 +196,14 @@
         ioCallback = 0;
 
         if (mgmtClosing)
-            close(403, "Closed by Management Request", 0, 0);
+            close(execution::ERROR_CODE_UNAUTHORIZED_ACCESS, "Closed by 
Management Request", 0, 0);
         else
             //then do other output as needed:
             return outputTasks.doOutput();
     }catch(ConnectionException& e){
         close(e.code, e.getMessage(), 0, 0);
     }catch(std::exception& e){
-        close(541/*internal error*/, e.what(), 0, 0);
+        close(execution::ERROR_CODE_INTERNAL_ERROR, e.what(), 0, 0);
     }
     return false;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connection.h Thu Sep 18 
13:18:29 2008
@@ -166,6 +166,9 @@
     void resume(Session& session);
 
     bool isOpen() const;
+
+    
+  friend class ConnectionAccess; ///<@internal
 };
 
 }} // namespace qpid::client

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Sep 18 
13:18:29 2008
@@ -89,7 +89,7 @@
 
 void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
     Mutex::ScopedLock l(lock);
-    connections.insert(ConnectionMap::value_type(ConnectionId(self, c.get()), 
c));
+    handler->insert(c);
 }
 
 void Cluster::erase(ConnectionId id) {
@@ -186,8 +186,10 @@
         e.getConnection()->deliverBuffer(buf);
     else {              // control
         AMQFrame frame;
-        while (frame.decode(buf))
+        while (frame.decode(buf)) {
+            QPID_LOG(trace, "DLVR [" << self << "]: " << frame);
             e.getConnection()->received(frame);
+        }
     }
 }
 
@@ -274,6 +276,7 @@
 
 void Cluster::stall() {
     Mutex::ScopedLock l(lock);
+    QPID_LOG(debug, self << " stalling.");
     // Stop processing connection events. We still process config changes
     // and cluster controls in deliver()
     connectionEventQueue.stop();
@@ -357,6 +360,4 @@
 
 }
 
-
-
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Sep 18 
13:18:29 2008
@@ -91,6 +91,8 @@
     void shutdown();
 
     broker::Broker& getBroker();
+
+    void setDumpComplete();
     
   private:
     typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > 
ConnectionMap;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Thu Sep 18 
13:18:29 2008
@@ -24,6 +24,7 @@
 
 #include "Cpg.h"
 #include "types.h"
+#include <boost/intrusive_ptr.hpp>
 
 namespace qpid {
 
@@ -31,6 +32,7 @@
 
 namespace cluster {
 
+class Connection;
 class Cluster;
 class Event;
 
@@ -44,6 +46,8 @@
     ClusterHandler(Cluster& c);
     virtual ~ClusterHandler();
 
+    bool invoke(const MemberId&, framing::AMQFrame& f);
+
     virtual void update(const MemberId&, const framing::FieldTable& members, 
uint64_t dumping) = 0;
     virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
     virtual void ready(const MemberId&, const std::string& url) = 0;
@@ -54,7 +58,7 @@
                               cpg_address *left, int nLeft,
                               cpg_address *joined, int nJoined) = 0;
 
-    bool invoke(const MemberId&, framing::AMQFrame& f);
+    virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0;
 
   protected:
     Cluster& cluster;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Sep 18 
13:18:29 2008
@@ -34,29 +34,31 @@
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, ConnectionId myId)
     : cluster(c), self(myId), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId)
+      connection(&output, cluster.getBroker(), wrappedId), catchUp(), 
exCatchUp()
 {}
 
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
-                       const std::string& wrappedId, MemberId myId)
+                       const std::string& wrappedId, MemberId myId, bool 
isCatchUp)
     : cluster(c), self(myId, this), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId)
+      connection(&output, cluster.getBroker(), wrappedId),
+      catchUp(isCatchUp), exCatchUp()
 {}
 
 Connection::~Connection() {}
 
-bool Connection::doOutput() { return output.doOutput(); }
+bool Connection::doOutput() {
+    return output.doOutput();
+}
 
 // Delivery of doOutput allows us to run the real connection doOutput()
 // which stocks up the write buffers with data.
 //
 void Connection::deliverDoOutput(uint32_t requested) {
+    assert(!catchUp);
     output.deliverDoOutput(requested);
 }
 
-// Handle frames delivered from cluster.
 void Connection::received(framing::AMQFrame& f) {
-    QPID_LOG(trace, "DLVR [" << self << "]: " << f);
     // Handle connection controls, deliver other frames to connection.
     if (!framing::invoke(*this, *f.getBody()).wasHandled())
         connection.received(f);
@@ -64,16 +66,28 @@
 
 void Connection::closed() {
     try {
-        // Called when the local network connection is closed. We still
-        // need to process any outstanding cluster frames for this
-        // connection to ensure our sessions are up-to-date. We defer
-        // closing the Connection object till deliverClosed(), but replace
-        // its output handler with a null handler since the network output
-        // handler will be deleted.
-        // 
-        connection.setOutputHandler(&discardHandler); 
-        cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
-        ++mcastSeq;
+        // Local network connection has closed.  We need to keep the
+        // connection around but replace the output handler with a
+        // no-op handler as the network output handler will be
+        // deleted.
+
+        // FIXME aconway 2008-09-18: output handler reset in right place?
+        // connection.setOutputHandler(&discardHandler);
+        output.setOutputHandler(discardHandler);
+        if (catchUp) {
+            // This was a catch-up connection, may be promoted to a
+            // shadow connection.
+            catchUp = false;
+            exCatchUp = true;
+            cluster.insert(boost::intrusive_ptr<Connection>(this));
+        }
+        else {
+            // This was a local replicated connection. Multicast a deliver 
closed
+            // and process any outstanding frames from the cluster until
+            // self-delivery of deliver-closed.
+            cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
+            ++mcastSeq;
+        }
     }
     catch (const std::exception& e) {
         QPID_LOG(error, QPID_MSG("While closing connection: " << e.what()));
@@ -81,17 +95,20 @@
 }
 
 void Connection::deliverClose () {
+    assert(!catchUp);
     connection.closed();
     cluster.erase(self);
 }
 
 size_t Connection::decode(const char* buffer, size_t size) { 
+    assert(!catchUp);
     ++mcastSeq;
     cluster.mcastBuffer(buffer, size, self);
     return size;
 }
 
 void Connection::deliverBuffer(Buffer& buf) {
+    assert(!catchUp);
     ++deliverSeq;
     while (decoder.decode(buf))
         received(decoder.frame);
@@ -108,10 +125,15 @@
     // FIXME aconway 2008-09-10: TODO
 }
     
-void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/)
-{
+void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) 
{
     // FIXME aconway 2008-09-10: TODO
 }
 
+void Connection::dumpComplete() {
+    // FIXME aconway 2008-09-18: use or remove.
+}
+
+bool Connection::isLocal() const { return self.first == cluster.getSelf() && 
self.second == this; }
+ 
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Sep 18 
13:18:29 2008
@@ -51,21 +51,21 @@
 {
   public:
     /** Local connection, use this in ConnectionId */
-    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& 
id, MemberId);
+    Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& 
id, MemberId, bool catchUp);
     /** Shadow connection */
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& 
id, ConnectionId);
     ~Connection();
     
     ConnectionId getId() const { return self; }
     broker::Connection& getBrokerConnection() { return connection; }
-    bool isLocal() const { return self.second == this; }
+    bool isLocal() const;
 
-    Cluster& getCluster() { return cluster; }
+    /** True if the connection is in "catch-up" mode: building initial state */
+    bool isCatchUp() const { return catchUp; }
+    bool isExCatchUp() const { return exCatchUp; }
 
-    // self-delivery of multicast data.
-    void deliverClose();
-    void deliverDoOutput(uint32_t requested);
-    void deliverBuffer(framing::Buffer&);
+
+    Cluster& getCluster() { return cluster; }
 
     // ConnectionOutputHandler methods
     void close() {}
@@ -84,19 +84,27 @@
     // ConnectionCodec methods
     size_t decode(const char* buffer, size_t size);
 
-    // ConnectionInputHandlerFactory
-    sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, 
const std::string& id, bool isClient);
+    // Called by cluster to deliver a buffer from CPG.
+    void deliverBuffer(framing::Buffer&);
+
 
+    // ==== Used in catch-up mode to build initial state.
+    // 
     // State dump methods.
-    virtual void sessionState(const SequenceNumber& replayStart,
+    void sessionState(const SequenceNumber& replayStart,
                               const SequenceSet& sentIncomplete,
                               const SequenceNumber& expected,
                               const SequenceNumber& received,
                               const SequenceSet& unknownCompleted, const 
SequenceSet& receivedIncomplete);
     
-    virtual void shadowReady(uint64_t memberId, uint64_t connectionId);
+    void shadowReady(uint64_t memberId, uint64_t connectionId);
+
+    void dumpComplete();
 
   private:
+
+    void deliverClose();
+    void deliverDoOutput(uint32_t requested);
     void sendDoOutput();
 
     Cluster& cluster;
@@ -108,6 +116,8 @@
     broker::Connection connection;
     framing::SequenceNumber mcastSeq;
     framing::SequenceNumber deliverSeq;
+    bool catchUp;
+    bool exCatchUp;
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Sep 
18 13:18:29 2008
@@ -32,7 +32,9 @@
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(framing::ProtocolVersion v, 
sys::OutputControl& out, const std::string& id) {
     if (v == framing::ProtocolVersion(0, 10))
-        return new ConnectionCodec(out, id, cluster);
+        return new ConnectionCodec(out, id, cluster, false);
+    else if (v == framing::ProtocolVersion(0x80 + 0, 0x80 + 10))
+        return new ConnectionCodec(out, id, cluster, true); // Catch-up 
connection
     return 0;
 }
 
@@ -42,9 +44,9 @@
     return next->create(out, id);
 }
 
-ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& 
id, Cluster& cluster)
+ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& 
id, Cluster& cluster, bool catchUp)
     : codec(out, id, false),
-      interceptor(new Connection(cluster, codec, id, cluster.getSelf()))
+      interceptor(new Connection(cluster, codec, id, cluster.getSelf(), 
catchUp))
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new 
ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);
@@ -55,7 +57,10 @@
 
 // ConnectionCodec functions delegate to the codecOutput
 size_t ConnectionCodec::decode(const char* buffer, size_t size) {
-    return interceptor->decode(buffer, size);
+    if (interceptor->isCatchUp())
+        return codec.decode(buffer, size);
+    else
+        return interceptor->decode(buffer, size);
 }
 
 size_t ConnectionCodec::encode(const char* buffer, size_t size) { return 
codec.encode(buffer, size); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Thu Sep 18 
13:18:29 2008
@@ -56,7 +56,7 @@
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& 
id);
     };
 
-    ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& 
c);
+    ConnectionCodec(sys::OutputControl& out, const std::string& id, Cluster& 
c, bool catchUp);
     ~ConnectionCodec();
 
     // ConnectionCodec functions.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Sep 18 
13:18:29 2008
@@ -27,12 +27,21 @@
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/ClusterConnectionDumpCompleteBody.h"
 #include "qpid/framing/enum.h"
+#include "qpid/framing/ProtocolVersion.h"
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
 #include <boost/bind.hpp>
 
 namespace qpid {
+
+namespace client {
+struct ConnectionAccess {
+    static void setVersion(Connection& c, const framing::ProtocolVersion& v) { 
c.version = v; }
+};
+} // namespace client
+
 namespace cluster {
 
 using broker::Broker;
@@ -40,16 +49,18 @@
 using broker::Queue;
 using broker::QueueBinding;
 using broker::Message;
+using namespace framing;
 using namespace framing::message;
-
 using namespace client;
 
+
 DumpClient::DumpClient(const Url& url, Broker& b,
                        const boost::function<void()>& ok,
                        const boost::function<void(const std::exception&)>& 
fail)
     : donor(b), done(ok), failed(fail)
 {
-    // FIXME aconway 2008-09-16: Identify as DumpClient connection.
+    // Special version identifies this as a catch-up connectionn.
+    client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 
0x80 + 10));
     connection.open(url);
     session = connection.newSession();
 }
@@ -65,9 +76,10 @@
     // Catch-up exchange is used to route messages to the proper queue without 
modifying routing key.
     session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", 
arg::autoDelete=true);
     donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
+    SessionBase_0_10Access sb(session);
+    // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
     session.sync();
     session.close();
-    // FIXME aconway 2008-09-17: send dump complete indication.
     connection.close();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Thu Sep 
18 13:18:29 2008
@@ -30,7 +30,7 @@
 using namespace sys;
 using namespace framing;
 
-JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {}
+JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START), 
catchUpConnections(0) {}
 
 void JoiningHandler::configChange(
     cpg_address *current, int nCurrent,
@@ -74,21 +74,17 @@
     else {                      // Start a new dump
         cluster.map.dumper = cluster.map.first();
         if (dumpee == cluster.self) { // My turn
-
-            state = DUMP_COMPLETE;        // FIXME aconway 2008-09-18: bypass 
dump
-
-            QPID_LOG(info, cluster.self << " receiving state dump from " << 
cluster.map.dumper);
             switch (state) {
               case START:
               case STALLED:
                 assert(0); break;
 
               case DUMP_REQUESTED: 
+                QPID_LOG(info, cluster.self << " stalling for dump from " << 
cluster.map.dumper);
                 state = STALLED;
                 cluster.stall();
                 break;
 
-                // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE 
state.
               case DUMP_COMPLETE:
                 cluster.ready();
                 break;
@@ -102,5 +98,34 @@
     checkDumpRequest();
 }
 
+void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+    if (c->isCatchUp()) {
+        ++catchUpConnections;
+        QPID_LOG(debug, "Received " << catchUpConnections << " catch-up 
connections.");
+    }
+    else if (c->isExCatchUp()) {
+        if (c->getId().getConnectionPtr() != c.get()) // become shadow 
connection
+            
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+        QPID_LOG(debug, "Catch-up connection terminated " << 
catchUpConnections-1 << " remaining");
+        if (--catchUpConnections == 0)
+            dumpComplete();
+    }
+    else      // Local connection, will be stalled till dump complete.
+        
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+}
+
+void JoiningHandler::dumpComplete() {
+    // FIXME aconway 2008-09-18: need to detect incomplete dump.
+    // 
+    if (state == STALLED) {
+        QPID_LOG(debug, "Dump complete, unstalling.");
+        cluster.ready();
+    }
+    else {
+        QPID_LOG(debug, "Dump complete, waiting for stall point.");
+        assert(state == DUMP_REQUESTED);
+        state = DUMP_COMPLETE;
+    }
+}
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h Thu Sep 18 
13:18:29 2008
@@ -46,9 +46,14 @@
     void dumpRequest(const MemberId&, const std::string& url);
     void ready(const MemberId&, const std::string& url);
 
+    void insert(const boost::intrusive_ptr<Connection>& c);
+    
   private:
-    enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
     void checkDumpRequest();
+    void dumpComplete();
+
+    enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
+    size_t catchUpConnections;
 
 };
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Thu Sep 18 
13:18:29 2008
@@ -23,6 +23,7 @@
 #include "DumpClient.h"
 #include "qpid/log/Statement.h"
 #include "qpid/framing/ClusterUpdateBody.h"
+#include "qpid/framing/enum.h"
 
 namespace qpid {
 namespace cluster {
@@ -32,6 +33,10 @@
 
 MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
 
+MemberHandler::~MemberHandler() { 
+    if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+}
+
 void MemberHandler::configChange(
     cpg_address */*current*/, int /*nCurrent*/,
     cpg_address */*left*/, int /*nLeft*/,
@@ -58,11 +63,10 @@
     assert(!cluster.connectionEventQueue.isStopped()); // Not currently 
stalled.
     cluster.stall();
 
-    cluster.ready();            // FIXME aconway 2008-09-18: Bypass dump
-    (void)urlStr;
-//     dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
-//                             boost::bind(&MemberHandler::dumpDone, this),
-//                             boost::bind(&MemberHandler::dumpError, this, 
_1)));
+    if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+    dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+                            boost::bind(&MemberHandler::dumpSent, this),
+                            boost::bind(&MemberHandler::dumpError, this, _1)));
 }
 
 void MemberHandler::ready(const MemberId& id, const std::string& url) {
@@ -70,14 +74,22 @@
 }
 
 
-void MemberHandler::dumpDone() {
-    dumpThread.join();          // Clean up.
+void MemberHandler::dumpSent() {
+    QPID_LOG(debug, "Finished sending state dump.");
+    Mutex::ScopedLock l(cluster.lock);
     cluster.ready();
 }
 
 void MemberHandler::dumpError(const std::exception& e) {
-    QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << 
e.what());
-    dumpDone();
+    QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " 
<< e.what());
+    dumpSent();
+}
+
+void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+    if (c->isCatchUp())         // Not allowed in member mode
+        c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, 
"Not in catch-up mode.");
+    else
+        cluster.connections[c->getId()] = c;
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Thu Sep 18 
13:18:29 2008
@@ -35,6 +35,7 @@
 {
   public:
     MemberHandler(Cluster& c);
+    ~MemberHandler();
     
     void configChange(
         struct cpg_address */*members*/, int /*nMembers*/,
@@ -48,9 +49,11 @@
     void dumpRequest(const MemberId&, const std::string& url);
     void ready(const MemberId&, const std::string& url);
 
-    void dumpDone();
+    void dumpSent();
     void dumpError(const std::exception&);
 
+    void insert(const boost::intrusive_ptr<Connection>& c);
+
   public:
     sys::Thread dumpThread;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu 
Sep 18 13:18:29 2008
@@ -33,12 +33,12 @@
 using namespace framing;
 
 OutputInterceptor::OutputInterceptor(cluster::Connection& p, 
sys::ConnectionOutputHandler& h)
-    : parent(p), next(h), sent(), moreOutput(), doingOutput()
+    : parent(p), next(&h), sent(), moreOutput(), doingOutput()
 {}
 
 void OutputInterceptor::send(framing::AMQFrame& f) {
     Locker l(lock); 
-    next.send(f);
+    next->send(f);
     sent += f.size();
 }
 
@@ -60,7 +60,7 @@
 // 
 void OutputInterceptor::deliverDoOutput(size_t requested) {
     Locker l(lock);
-    size_t buf = next.getBuffered();
+    size_t buf = next->getBuffered();
     if (parent.isLocal())
         writeEstimate.delivered(sent, buf); // Update the estimate.
 
@@ -101,4 +101,9 @@
     QPID_LOG(trace, &parent << "Send doOutput request for " << request);
 }
 
+void OutputInterceptor::setOutputHandler(sys::ConnectionOutputHandler& h) {
+    Locker l(lock);
+    next = &h;
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Thu Sep 
18 13:18:29 2008
@@ -43,14 +43,16 @@
     // sys::ConnectionOutputHandler functions
     void send(framing::AMQFrame& f);
     void activateOutput();
-    void close() { Locker l(lock); next.close(); }
-    size_t getBuffered() const { Locker l(lock); return next.getBuffered(); }
+    void close() { Locker l(lock); next->close(); }
+    size_t getBuffered() const { Locker l(lock); return next->getBuffered(); }
 
     // Delivery point for doOutput requests.
     void deliverDoOutput(size_t requested);
     // Intercept doOutput requests on Connection.
     bool doOutput();
 
+    void setOutputHandler(sys::ConnectionOutputHandler& h);
+
     cluster::Connection& parent;
     
   private:
@@ -60,7 +62,7 @@
     void sendDoOutput();
 
     mutable sys::Mutex lock;
-    sys::ConnectionOutputHandler& next;
+    sys::ConnectionOutputHandler* next;
     size_t sent;
     WriteEstimate writeEstimate;
     bool moreOutput;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Sep 18 
13:18:29 2008
@@ -74,10 +74,12 @@
     string name;
     std::auto_ptr<BrokerFixture> broker0;
     boost::ptr_vector<ForkedBroker> forkedBrokers;
+    bool init0;
 
-    ClusterFixture(size_t n);
+    ClusterFixture(size_t n, bool init0=true);
     void add(size_t n) { for (size_t i=0; i < n; ++i) add(); }
     void add();
+    void add0(bool force);
     void setup();
     void kill(size_t n) {
         if (n) forkedBrokers[n-1].kill();
@@ -85,8 +87,9 @@
     }
 };
 
-ClusterFixture::ClusterFixture(size_t n) : name(Uuid(true).str()) {
+ClusterFixture::ClusterFixture(size_t n, bool init0_) : 
name(Uuid(true).str()), init0(init0_) {
     add(n);
+    if (!init0) return;  // FIXME aconway 2008-09-18: can't use local hack in 
this case.
     // Wait for all n members to join the cluster
     int retry=20;            // TODO aconway 2008-07-16: nasty sleeps, clean 
this up.
     while (retry && getGlobalCluster().size() != n) {
@@ -101,24 +104,42 @@
     os << "fork" << size();
     std::string prefix = os.str();
 
+    if (size())  {              // Not the first broker, fork.
+
+        const char* argv[] = {
+            "qpidd " __FILE__ ,
+            "--load-module=../.libs/cluster.so",
+            "--cluster-name", name.c_str(), 
+            "--auth=no", "--no-data-dir",
+            "--log-prefix", prefix.c_str(),
+        };
+        size_t argc = sizeof(argv)/sizeof(argv[0]);
+
+
+        forkedBrokers.push_back(new ForkedBroker(argc, argv));
+        push_back(forkedBrokers.back().getPort());
+    }
+    else {      
+        add0(init0);            // First broker, run in this process.
+    }
+}
+
+void ClusterFixture::add0(bool init) {
+    if (!init) {
+        push_back(0);
+        return;
+    }
     const char* argv[] = {
         "qpidd " __FILE__ ,
         "--load-module=../.libs/cluster.so",
         "--cluster-name", name.c_str(), 
-        "--auth=no", "--no-data-dir",
-        "--log-prefix", prefix.c_str(),
+        "--auth=no", "--no-data-dir"
     };
     size_t argc = sizeof(argv)/sizeof(argv[0]);
 
-    if (size())  {              // Not the first broker, fork.
-        forkedBrokers.push_back(new ForkedBroker(argc, argv));
-        push_back(forkedBrokers.back().getPort());
-    }
-    else {                      // First broker, run in this process.
-        qpid::log::Logger::instance().setPrefix("main");
-        broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
-        push_back(broker0->getPort());
-    }
+    qpid::log::Logger::instance().setPrefix("main");
+    broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
+    push_back(broker0->getPort());
 }
 
 // For debugging: op << for CPG types.
@@ -140,60 +161,6 @@
     return o;
 }
 
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testCatchupSharedState, 1) {
-    ClusterFixture cluster(1);
-    Client c0(cluster[0], "c0");
-    // Create some shared state.
-    c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=Message("foo","q"));
-    while (c0.session.queueQuery("q").getMessageCount() != 1)
-        ::usleep(1000);         // Wait for message to show up on broker 0.
-
-    // Now join new broker, should catch up.
-    cluster.add();
-    c0.session.messageTransfer(arg::content=Message("bar","q"));
-    c0.session.queueDeclare("p");
-    c0.session.messageTransfer(arg::content=Message("poo","p"));
-
-    // Verify new broker has all state.
-    Message m;
-    Client c1(cluster[1], "c1");
-    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "foo");
-    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "bar");
-    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 
(unsigned)0);
-    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "poo");
-    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 
(unsigned)0);
-}
-
-QPID_AUTO_TEST_CASE(testStall) {
-    ClusterFixture cluster(2);
-    Client c0(cluster[0], "c0");
-    Client c1(cluster[1], "c1");
-
-    // Declare on all to avoid race condition.
-    c0.session.queueDeclare("q");
-    c1.session.queueDeclare("q");
-    
-    // Stall 0, verify it does not process deliverys while stalled.
-    getGlobalCluster().stall();
-    c1.session.messageTransfer(arg::content=Message("foo","q"));
-    while (c1.session.queueQuery("q").getMessageCount() != 1)
-        ::usleep(1000);         // Wait for message to show up on broker 1.
-    sleep(2);               // FIXME aconway 2008-09-11: remove.
-    // But it should not be on broker 0.
-    boost::shared_ptr<broker::Queue> q0 = 
cluster.broker0->broker->getQueues().find("q");
-    BOOST_REQUIRE(q0);
-    BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
-    // Now unstall and we should get the message.
-    getGlobalCluster().ready();
-    Message m;
-    BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "foo");
-}
-
 #if 0                           // FIXME aconway 2008-09-10: finish & enable
 QPID_AUTO_TEST_CASE(testDumpConsumers) {
     ClusterFixture cluster(1);
@@ -226,20 +193,36 @@
 
 #endif
 
-QPID_AUTO_TEST_CASE(testForkedBroker) {
-    // Verify the ForkedBroker works as expected.
-    const char* argv[] = { "", "--auth=no", "--no-data-dir", 
"--log-prefix=testForkedBroker" };
-    ForkedBroker broker(sizeof(argv)/sizeof(argv[0]), argv);
-    Client c(broker.getPort());
-    BOOST_CHECK_EQUAL("direct", 
c.session.exchangeQuery("amq.direct").getType()); 
-}
 
-QPID_AUTO_TEST_CASE(testSingletonCluster) {
-    // Test against a singleton cluster, verify basic operation.
+QPID_AUTO_TEST_CASE(testCatchupSharedState) {
     ClusterFixture cluster(1);
-    Client c(cluster[0]);
-    BOOST_CHECK(c.session.queueQuery("q").getQueue().empty());
-    BOOST_CHECK(c.session.exchangeQuery("ex").getNotFound());
+
+    Client c0(cluster[0], "c0");
+    // Create some shared state.
+    c0.session.queueDeclare("q");
+    c0.session.messageTransfer(arg::content=Message("foo","q"));
+    c0.session.messageTransfer(arg::content=Message("bar","q"));
+    while (c0.session.queueQuery("q").getMessageCount() != 2)
+        ::usleep(1000);         // Wait for message to show up on broker 0.
+
+    // FIXME aconway 2008-09-18: close session until we catchup session state 
also.
+    c0.session.close();
+    c0.connection.close();
+
+    // Now join new broker, should catch up.
+    cluster.add();
+
+    // FIXME aconway 2008-09-18: when we do session state try adding
+    // further stuff from broker 0, and leaving a subscription active.
+
+    // Verify new broker has all state.
+    Message m;
+    Client c1(cluster[1], "c1");
+    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "foo");
+    BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "bar");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 
(unsigned)0);
 }
 
 QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -326,4 +309,30 @@
     BOOST_CHECK_EQUAL(0u, c2.session.queueQuery("q").getMessageCount());
 }
 
+QPID_AUTO_TEST_CASE(testStall) {
+    ClusterFixture cluster(2);
+    Client c0(cluster[0], "c0");
+    Client c1(cluster[1], "c1");
+
+    // Declare on all to avoid race condition.
+    c0.session.queueDeclare("q");
+    c1.session.queueDeclare("q");
+    
+    // Stall 0, verify it does not process deliverys while stalled.
+    getGlobalCluster().stall();
+    c1.session.messageTransfer(arg::content=Message("foo","q"));
+    while (c1.session.queueQuery("q").getMessageCount() != 1)
+        ::usleep(1000);         // Wait for message to show up on broker 1.
+    sleep(2);               // FIXME aconway 2008-09-11: remove.
+    // But it should not be on broker 0.
+    boost::shared_ptr<broker::Queue> q0 = 
cluster.broker0->broker->getQueues().find("q");
+    BOOST_REQUIRE(q0);
+    BOOST_CHECK_EQUAL(q0->getMessageCount(), (unsigned)0);
+    // Now unstall and we should get the message.
+    getGlobalCluster().ready();
+    Message m;
+    BOOST_CHECK(c0.subs.get(m, "q", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "foo");
+}
+
 QPID_AUTO_TEST_SUITE_END()

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=696788&r1=696787&r2=696788&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Sep 18 13:18:29 2008
@@ -60,7 +60,8 @@
         - attach sessions, create consumers, set flow with normal AMQP 
cokmmands.
         - reset session state by sending session-state for each session.
          - frames following session-state are replay frames.
-        - send shadow-ready to mark end of dump.
+        - send shadow-ready to mark end of shadow dump.
+        - send dump-complete when entire dump is complete.
     -->
     <control name="session-state" code="0x4" label="Set session state during a 
brain dump.">
       <!-- Target session deduced from channel number.  -->
@@ -79,5 +80,6 @@
       <field name="connection-id" type="uint64"/>
     </control>
 
+    <control name="dump-complete" code="0x6" label="End of brain dump."/>
   </class>
 </amqp>


Reply via email to