Author: aconway
Date: Wed Sep 24 10:34:08 2008
New Revision: 698666

URL: http://svn.apache.org/viewvc?rev=698666&view=rev
Log:
Cluster replicates session command sequence state and consumers to newcomers.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Wed Sep 24 10:34:08 
2008
@@ -237,7 +237,7 @@
     replayFlushLimit(flush), replayHardLimit(hard) {}
 
 SessionState::SessionState(const SessionId& i, const Configuration& c)
-  : id(i), timeout(), config(c), stateful()
+    : id(i), timeout(), config(c), stateful()
 {
     QPID_LOG(debug, "SessionState::SessionState " << id << ": " << this);
 }
@@ -250,4 +250,29 @@
     return o << "(" << p.command.getValue() << "+" << p.offset << ")";
 }
 
+void SessionState::setState(
+    const SequenceNumber& replayStart,
+    const SequenceNumber& sendCommandPoint,
+    const SequenceSet& sentIncomplete,
+    const SequenceNumber& expected,
+    const SequenceNumber& received,
+    const SequenceSet& unknownCompleted,
+    const SequenceSet& receivedIncomplete
+)
+{
+    sender.replayPoint = replayStart;
+    sender.flushPoint = sendCommandPoint;
+    sender.sendPoint = sendCommandPoint;
+    sender.unflushedSize = 0;
+    sender.replaySize = 0;      // Replay list will be updated separately.
+    sender.incomplete = sentIncomplete;
+    sender.bytesSinceKnownCompleted = 0;
+
+    receiver.expected = expected;
+    receiver.received = received;
+    receiver.unknownCompleted = unknownCompleted;
+    receiver.incomplete = receivedIncomplete;
+    receiver.bytesSinceKnownCompleted = 0;
+}
+
 } // namespace qpid

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.h Wed Sep 24 10:34:08 
2008
@@ -130,8 +130,8 @@
     virtual SessionPoint senderGetReplayPoint() const;
 
     /** Peer expecting commands from this point.
-        virtual [EMAIL PROTECTED] Range of frames to be replayed.
-    */
+     [EMAIL PROTECTED] Range of frames to be replayed.
+     */
     virtual ReplayRange senderExpected(const SessionPoint& expected);
 
     // ==== Functions for receiver state
@@ -168,6 +168,19 @@
     /** ID of the command currently being handled. */
     virtual SequenceNumber receiverGetCurrent() const;
 
+    /** Set the state variables, used to create a session that will resume
+     *  from some previously established point.
+     */
+    virtual void setState(
+        const SequenceNumber& replayStart,
+        const SequenceNumber& sendCommandPoint,
+        const SequenceSet& sentIncomplete,
+        const SequenceNumber& expected,
+        const SequenceNumber& received,
+        const SequenceSet& unknownCompleted,
+        const SequenceSet& receivedIncomplete
+    );
+
   private:
 
     struct SendState {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Sep 24 
10:34:08 2008
@@ -110,6 +110,12 @@
         bool doOutput();
 
         std::string getName() const { return name; }
+
+        bool isAckExpected() const { return ackExpected; }
+        bool isAcquire() const { return acquire; }
+        bool isWindowing() const { return windowing; }
+        uint32_t getMsgCredit() const { return msgCredit; }
+        uint32_t getByteCredit() const { return byteCredit; }
     };
 
   private:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Sep 24 
10:34:08 2008
@@ -436,7 +436,7 @@
                               uint8_t acceptMode,
                               uint8_t acquireMode,
                               bool exclusive,
-                              const string& /*resumeId*/,//TODO implement 
resume behaviour
+                              const string& /*resumeId*/,//TODO implement 
resume behaviour. Need to update cluster.
                               uint64_t /*resumeTtl*/,
                               const FieldTable& arguments)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Wed Sep 
24 10:34:08 2008
@@ -50,6 +50,9 @@
     ESTABLISHED.insert(FAILED);
     ESTABLISHED.insert(CLOSED);
     ESTABLISHED.insert(OPEN);
+
+    FINISHED.insert(FAILED);
+    FINISHED.insert(CLOSED);
 } 
 
 void ConnectionHandler::incoming(AMQFrame& frame)
@@ -107,7 +110,7 @@
       case OPEN:
         setState(CLOSING);
         proxy.close(200, OK);
-        waitFor(CLOSED);
+        waitFor(FINISHED);
         break;
         // Nothing to do for CLOSING, CLOSED, FAILED or NOT_STARTED
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.h Wed Sep 
24 10:34:08 2008
@@ -44,7 +44,7 @@
 {
     typedef framing::AMQP_ClientOperations::ConnectionHandler 
ConnectionOperations;
     enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, 
FAILED};
-    std::set<int> ESTABLISHED;
+    std::set<int> ESTABLISHED, FINISHED;
 
     class Adapter : public framing::FrameHandler
     {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Sep 24 
10:34:08 2008
@@ -68,7 +68,8 @@
     
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, 
this, _1))),
     handler(&joiningHandler),
     joiningHandler(*this),
-    memberHandler(*this)
+    memberHandler(*this),
+    mcastId()
 {
     ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
     if (agent != 0){
@@ -109,22 +110,22 @@
 }
 
 void Cluster::mcastControl(const framing::AMQBody& body, Connection* cptr) {
-    QPID_LOG(trace, "MCAST [" << self << "]: " << body);
     AMQFrame f(body);
-    Event e(CONTROL, ConnectionId(self, cptr), f.size());
+    Event e(CONTROL, ConnectionId(self, cptr), f.size(), ++mcastId);
     Buffer buf(e);
     f.encode(buf);
+    QPID_LOG(trace, "MCAST " << e << " " << body);
     mcastEvent(e);
 }
 
-void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& 
connection) {
-    Event e(DATA, connection, size);
+void Cluster::mcastBuffer(const char* data, size_t size, const ConnectionId& 
connection, size_t id) {
+    Event e(DATA, connection, size, id);
     memcpy(e.getData(), data, size);
+    QPID_LOG(trace, "MCAST " << e);
     mcastEvent(e);
 }
 
 void Cluster::mcastEvent(const Event& e) {
-    QPID_LOG(trace, "MCAST " << e);
     e.mcast(name, cpg);
 }
 
@@ -166,12 +167,13 @@
     try {
         MemberId from(nodeid, pid);
         Event e = Event::delivered(from, msg, msg_len);
+
         // Process cluster controls immediately 
         if (e.getConnectionId().getConnectionPtr() == 0)  { // Cluster control
             Buffer buf(e);
             AMQFrame frame;
             while (frame.decode(buf)) {
-                QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << 
"]: " << *frame.getBody());
+                QPID_LOG(trace, "DLVR " << e << " " << frame);
                 if (!handler->invoke(e.getConnectionId().getMember(), frame))
                     throw Exception(QPID_MSG("Invalid cluster control"));
             }
@@ -189,17 +191,17 @@
 
 void Cluster::connectionEvent(const Event& e) {
     Buffer buf(e);
-    QPID_LOG(trace, "EXEC: " << e);
     boost::intrusive_ptr<Connection> connection = 
getConnection(e.getConnectionId()); 
     assert(connection);
     if (e.getType() == DATA) {
+        QPID_LOG(trace, "EXEC: " << e);
         connection->deliverBuffer(buf);
     }
     else {              // control
         AMQFrame frame;
         while (frame.decode(buf)) {
-            QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame);
-            connection->received(frame);
+            QPID_LOG(trace, "EXEC " << e << " " << frame);
+            connection->delivered(frame);
         }
     }
 }
@@ -351,7 +353,7 @@
 
 void Cluster::stopClusterNode(void)
 {
-    // FIXME aconway 2008-09-18: 
+    // FIXME aconway 2008-09-18: mgmt
     QPID_LOG(notice, self << " disconnected from cluster " << name.str());
     broker.shutdown();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Sep 24 
10:34:08 2008
@@ -77,7 +77,7 @@
     
     /** Send to the cluster */
     void mcastControl(const framing::AMQBody& controlBody, Connection* cptr);
-    void mcastBuffer(const char*, size_t, const ConnectionId&);
+    void mcastBuffer(const char*, size_t, const ConnectionId&, size_t id);
     void mcastEvent(const Event& e);
     
     /** Leave the cluster */
@@ -89,6 +89,7 @@
     void ready(const MemberId&, const std::string& url);
 
     MemberId getSelf() const { return self; }
+    MemberId getId() const { return self; }
 
     void ready();
     void stall();
@@ -169,6 +170,8 @@
     JoiningHandler joiningHandler;
     MemberHandler memberHandler;
 
+    size_t mcastId;
+
   friend class JoiningHandler;
   friend class MemberHandler;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Sep 24 
10:34:08 2008
@@ -20,10 +20,15 @@
  */
 #include "Connection.h"
 #include "Cluster.h"
+
+#include "qpid/broker/SessionState.h"
 #include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
+#include "qpid/framing/ConnectionCloseBody.h"
+#include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
-#include "qpid/framing/AllInvoker.h"
+
 #include <boost/current_function.hpp>
 
 namespace qpid {
@@ -31,6 +36,8 @@
 
 using namespace framing;
 
+NoOpConnectionOutputHandler Connection::discardHandler;
+
 // Shadow connections
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, ConnectionId myId)
@@ -49,7 +56,9 @@
     QPID_LOG(debug, "New connection: " << *this);
 }
 
-Connection::~Connection() {}
+Connection::~Connection() {
+    QPID_LOG(debug, "Deleted connection: " << *this);
+}
 
 bool Connection::doOutput() {
     return output.doOutput();
@@ -63,28 +72,55 @@
     output.deliverDoOutput(requested);
 }
 
+// Received from a directly connected client.
 void Connection::received(framing::AMQFrame& f) {
-    QPID_LOG(trace, "EXEC [" << *this << "]: " << f);
+    QPID_LOG(trace, "RECV " << *this << ": " << f);
+    if (isShadow()) {           
+        // Final close that completes catch-up for shadow connection.
+        if (catchUp && f.getMethod() && 
f.getMethod()->isA<ConnectionCloseBody>()) { 
+            AMQFrame ok(in_place<ConnectionCloseOkBody>());
+            connection.getOutput().send(ok);
+        }
+        else
+            QPID_LOG(warning, *this << " ignoring unexpected frame: " << f);
+    }
+    else {
+        currentChannel = f.getChannel();
+        if (!framing::invoke(*this, *f.getBody()).wasHandled())
+            connection.received(f);
+    }
+}
+
+// Delivered from cluster.
+void Connection::delivered(framing::AMQFrame& f) {
+    QPID_LOG(trace, "DLVR " << *this << ": " << f);
+    assert(!isCatchUp());
     // Handle connection controls, deliver other frames to connection.
+    currentChannel = f.getChannel();
     if (!framing::invoke(*this, *f.getBody()).wasHandled())
         connection.received(f);
 }
 
 void Connection::closed() {
     try {
+        QPID_LOG(debug, "Connection closed " << *this);
+
+        if (catchUp) {
+            catchUp = false;
+            cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
+            if (!isShadow()) connection.closed();
+        }
+
         // Local network connection has closed.  We need to keep the
         // connection around but replace the output handler with a
         // no-op handler as the network output handler will be
         // deleted.
         output.setOutputHandler(discardHandler);
-        if (catchUp) {
-            catchUp = false;
-            cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
-        }
-        else {
-            // This was a local replicated connection. Multicast a deliver 
closed
-            // and process any outstanding frames from the cluster until
-            // self-delivery of deliver-closed.
+
+        if (isLocal()) {
+            // This was a local replicated connection. Multicast a deliver
+            // closed and process any outstanding frames from the cluster
+            // until self-delivery of deliver-close.
             cluster.mcastControl(ClusterConnectionDeliverCloseBody(), this);
             ++mcastSeq;
         }
@@ -100,29 +136,48 @@
     cluster.erase(self);
 }
 
-size_t Connection::decode(const char* buffer, size_t size) { 
-    assert(!catchUp);
-    ++mcastSeq;
-    cluster.mcastBuffer(buffer, size, self);
+// Decode data from local clients.
+size_t Connection::decode(const char* buffer, size_t size) {
+    if (catchUp) {              // Handle catch-up locally.
+        Buffer buf(const_cast<char*>(buffer), size);
+        while (localDecoder.decode(buf))
+            received(localDecoder.frame);
+    }
+    else {                      // Multicast local connections.
+        assert(isLocal());
+        cluster.mcastBuffer(buffer, size, self, ++mcastSeq);
+    }
     return size;
 }
 
 void Connection::deliverBuffer(Buffer& buf) {
     assert(!catchUp);
     ++deliverSeq;
-    while (decoder.decode(buf))
-        received(decoder.frame);
+    while (mcastDecoder.decode(buf))
+        delivered(mcastDecoder.frame);
 }
 
 
-void Connection::sessionState(const SequenceNumber& /*replayStart*/,
-                  const SequenceSet& /*sentIncomplete*/,
-                  const SequenceNumber& /*expected*/,
-                  const SequenceNumber& /*received*/,
-                  const SequenceSet& /*unknownCompleted*/,
-                  const SequenceSet& /*receivedIncomplete*/)
+void Connection::sessionState(
+    const SequenceNumber& replayStart,
+    const SequenceNumber& sendCommandPoint,
+    const SequenceSet& sentIncomplete,
+    const SequenceNumber& expected,
+    const SequenceNumber& received,
+    const SequenceSet& unknownCompleted,
+    const SequenceSet& receivedIncomplete)
 {
-    // FIXME aconway 2008-09-10: TODO
+    broker::SessionHandler& h = connection.getChannel(currentChannel);
+    broker::SessionState* s = h.getSession();
+    s->setState(
+        replayStart,
+        sendCommandPoint,
+        sentIncomplete,
+        expected,
+        received,
+        unknownCompleted,
+        receivedIncomplete);
+    QPID_LOG(debug, "Received session state dump for " << s->getId());
 }
     
 void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Sep 24 
10:34:08 2008
@@ -47,7 +47,6 @@
 class Connection :
         public RefCounted,
         public sys::ConnectionInputHandler,
-        public sys::ConnectionOutputHandler,
         public framing::AMQP_AllOperations::ClusterConnectionHandler
         
 {
@@ -60,20 +59,18 @@
     
     ConnectionId getId() const { return self; }
     broker::Connection& getBrokerConnection() { return connection; }
+
+    /** True for connections from direct clients of local broker */
     bool isLocal() const;
+
+    /** True for connections that are shadowing remote broker connections */
     bool isShadow() const { return !isLocal(); }
 
-    /** True if the connection is in "catch-up" mode: building initial state */
+    /** True if the connection is in "catch-up" mode: building initial broker 
state. */
     bool isCatchUp() const { return catchUp; }
 
     Cluster& getCluster() { return cluster; }
 
-    // ConnectionOutputHandler methods
-    void close() {}
-    void send(framing::AMQFrame&) {}
-    void activateOutput() {}
-    virtual size_t getBuffered() const { assert(0); return 0; }
-
     // ConnectionInputHandler methods
     void received(framing::AMQFrame&);
     void closed();
@@ -85,18 +82,19 @@
     // ConnectionCodec methods
     size_t decode(const char* buffer, size_t size);
 
-    // Called by cluster to deliver a buffer from CPG.
+    // Called for data delivered from the cluster.
     void deliverBuffer(framing::Buffer&);
-
+    void delivered(framing::AMQFrame&);
 
     // ==== Used in catch-up mode to build initial state.
     // 
     // State dump methods.
     void sessionState(const SequenceNumber& replayStart,
-                              const SequenceSet& sentIncomplete,
-                              const SequenceNumber& expected,
-                              const SequenceNumber& received,
-                              const SequenceSet& unknownCompleted, const 
SequenceSet& receivedIncomplete);
+                      const SequenceNumber& sendCommandPoint,
+                      const SequenceSet& sentIncomplete,
+                      const SequenceNumber& expected,
+                      const SequenceNumber& received,
+                      const SequenceSet& unknownCompleted, const SequenceSet& 
receivedIncomplete);
     
     void shadowReady(uint64_t memberId, uint64_t connectionId);
 
@@ -108,17 +106,20 @@
     void deliverDoOutput(uint32_t requested);
     void sendDoOutput();
 
+    static NoOpConnectionOutputHandler discardHandler;
+
     Cluster& cluster;
     ConnectionId self;
     bool catchUp;
-    NoOpConnectionOutputHandler discardHandler;
     WriteEstimate writeEstimate;
     OutputInterceptor output;
-    framing::FrameDecoder decoder;
+    framing::FrameDecoder localDecoder;
+    framing::FrameDecoder mcastDecoder;
     broker::Connection connection;
     framing::SequenceNumber mcastSeq;
     framing::SequenceNumber deliverSeq;
-
+    framing::ChannelId currentChannel;
+    
   friend std::ostream& operator<<(std::ostream&, const Connection&);
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Wed Sep 
24 10:34:08 2008
@@ -23,18 +23,23 @@
 #include "Cluster.h"
 #include "ProxyInputHandler.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/framing/ConnectionCloseBody.h"
+#include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/memory.h"
 #include <stdexcept>
+#include <boost/utility/in_place_factory.hpp>
 
 namespace qpid {
 namespace cluster {
 
+using namespace framing;
+
 sys::ConnectionCodec*
-ConnectionCodec::Factory::create(framing::ProtocolVersion v, 
sys::OutputControl& out, const std::string& id) {
-    if (v == framing::ProtocolVersion(0, 10))
+ConnectionCodec::Factory::create(ProtocolVersion v, sys::OutputControl& out, 
const std::string& id) {
+    if (v == ProtocolVersion(0, 10))
         return new ConnectionCodec(out, id, cluster, false);
-    else if (v == framing::ProtocolVersion(0x80 + 0, 0x80 + 10))
+    else if (v == ProtocolVersion(0x80 + 0, 0x80 + 10))
         return new ConnectionCodec(out, id, cluster, true); // Catch-up 
connection
     return 0;
 }
@@ -47,7 +52,8 @@
 
 ConnectionCodec::ConnectionCodec(sys::OutputControl& out, const std::string& 
id, Cluster& cluster, bool catchUp)
     : codec(out, id, false),
-      interceptor(new Connection(cluster, codec, id, cluster.getSelf(), 
catchUp))
+      interceptor(new Connection(cluster, codec, id, cluster.getSelf(), 
catchUp)),
+      id(interceptor->getId())
 {
     std::auto_ptr<sys::ConnectionInputHandler> ih(new 
ProxyInputHandler(interceptor));
     codec.setInputHandler(ih);
@@ -56,28 +62,18 @@
 
 ConnectionCodec::~ConnectionCodec() {}
 
-// ConnectionCodec functions delegate to the codecOutput
 size_t ConnectionCodec::decode(const char* buffer, size_t size) {
-    if (interceptor->isShadow())
-        throw Exception(QPID_MSG("Unexpected decode for shadow connection " << 
*interceptor));
-    else if (interceptor->isCatchUp())  {
-        size_t ret = codec.decode(buffer, size);
-        if (interceptor->isShadow()) {
-            // Promoted to shadow, close the codec.
-            // FIXME aconway 2008-09-19: can we close cleanly?
-            // codec.close();
-            throw Exception("Close codec");
-        }
-        return ret;
-    }
-    else
-        return interceptor->decode(buffer, size);
+    return interceptor->decode(buffer, size);
 }
 
+bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
+
 size_t ConnectionCodec::encode(const char* buffer, size_t size) { return 
codec.encode(buffer, size); }
+
 bool ConnectionCodec::canEncode() { return codec.canEncode(); }
+
 void ConnectionCodec::closed() { codec.closed(); }
-bool ConnectionCodec::isClosed() const { return codec.isClosed(); }
-framing::ProtocolVersion ConnectionCodec::getVersion() const { return 
codec.getVersion(); }
+
+ProtocolVersion ConnectionCodec::getVersion() const { return 
codec.getVersion(); }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Wed Sep 24 
10:34:08 2008
@@ -71,6 +71,7 @@
   private:
     amqp_0_10::Connection codec;
     boost::intrusive_ptr<cluster::Connection> interceptor;
+    cluster::ConnectionId id;
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Wed Sep 24 
10:34:08 2008
@@ -72,6 +72,8 @@
     sb.get()->send(body);
 }
 
+// TODO aconway 2008-09-24: optimization: dump connections/sessions in 
parallel.
+
 DumpClient::DumpClient(const Url& url, Cluster& c,
                        const boost::function<void()>& ok,
                        const boost::function<void(const std::exception&)>& 
fail)
@@ -79,9 +81,8 @@
       connection(catchUpConnection()), shadowConnection(catchUpConnection()),
       done(ok), failed(fail)
 {
-    QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url);
     connection.open(url);
-    session = connection.newSession();
+    session = connection.newSession("dump_shared");
 }
 
 DumpClient::~DumpClient() {}
@@ -91,6 +92,7 @@
 static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); 
 
 void DumpClient::dump() {
+    QPID_LOG(debug, donor.getSelf() << " starting dump to " << receiver);
     Broker& b = donor.getBroker();
     b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, 
_1));
     // Catch-up exchange is used to route messages to the proper queue without 
modifying routing key.
@@ -99,10 +101,9 @@
     session.sync();
     session.close();
     donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
-    QPID_LOG(debug, "Dump sent, closing catch_up connection.");
     // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
     connection.close();
-    QPID_LOG(debug, "Dump sent.");
+    QPID_LOG(debug,  donor.getSelf() << " dumped all state to " << receiver);
 }
 
 void DumpClient::run() {
@@ -153,49 +154,79 @@
 }
 
 void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& 
dumpConnection) {
-    QPID_LOG(debug, "Dump connection " << *dumpConnection);
-
     shadowConnection = catchUpConnection();
-    // FIXME aconway 2008-09-19: Open with settings from dumpConnection - 
userid etc.
-    shadowConnection.open(receiver);
+    broker::Connection& bc = dumpConnection->getBrokerConnection();
+    // FIXME aconway 2008-09-19: Open with identical settings to 
dumpConnection: password, vhost, frame size,
+    // authentication etc. See ConnectionSettings.
+    shadowConnection.open(receiver, bc.getUserId());
     
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession,
 this, _1));
     boost::shared_ptr<client::ConnectionImpl> impl = 
client::ConnectionAccess::getImpl(shadowConnection);
-    // FIXME aconway 2008-09-19: use proxy for cluster commands? 
-    AMQFrame 
ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(),
-                       dumpConnection->getId().getMember(),
-                       
reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())));
-    impl->handle(ready);
-    // Will be closed from the other end.
-    QPID_LOG(debug, "Dump done, connection " << *dumpConnection);
+    AMQP_AllProxy::ClusterConnection proxy(*impl);
+    proxy.shadowReady(dumpConnection->getId().getMember(),
+                      
reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr()));
+    shadowConnection.close();
+    QPID_LOG(debug, donor.getId() << " dumped connection " << *dumpConnection);
 }
 
 void DumpClient::dumpSession(broker::SessionHandler& sh) {
-    QPID_LOG(debug, "Dump session " << &sh.getConnection()  << "[" << 
sh.getChannel() << "] "
+    QPID_LOG(debug, donor.getId() << " dumping session " << 
&sh.getConnection()  << "[" << sh.getChannel() << "] = "
              << sh.getSession()->getId());
-
     broker::SessionState* s = sh.getSession();
     if (!s) return;         // no session.
+
     // Re-create the session.
     boost::shared_ptr<client::ConnectionImpl> cimpl = 
client::ConnectionAccess::getImpl(shadowConnection);
     size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
-    // FIXME aconway 2008-09-19: verify matching ID.
     boost::shared_ptr<client::SessionImpl> simpl(
         new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), 
max_frame_size));
     cimpl->addSession(simpl);
-    simpl->open(0);
-    client::Session cs;
-    client::SessionBase_0_10Access(cs).set(simpl);
-    cs.sync();
+    simpl->open(sh.getSession()->getTimeout());
+    client::SessionBase_0_10Access(shadowSession).set(simpl);
+    AMQP_AllProxy::ClusterConnection proxy(simpl->out);
 
+    // Re-create session state on remote connection.
     broker::SessionState* ss = sh.getSession();
+
     ss->eachConsumer(boost::bind(&DumpClient::dumpConsumer, this, _1));
     
     // FIXME aconway 2008-09-19: remaining session state.
-    QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
+
+    // Reset command-sequence state.
+    proxy.sessionState(
+        ss->senderGetReplayPoint().command,
+        ss->senderGetCommandPoint().command,
+        ss->senderGetIncomplete(),
+        ss->receiverGetExpected().command,
+        ss->receiverGetReceived().command,
+        ss->receiverGetUnknownComplete(),
+        ss->receiverGetIncomplete()
+    );
+
+    // FIXME aconway 2008-09-23: session replay list.
+
+    QPID_LOG(debug, donor.getId() << " dumped session " << 
sh.getSession()->getId());
 }
 
 void DumpClient::dumpConsumer(broker::SemanticState::ConsumerImpl* ci) {
-    QPID_LOG(critical, "DEBUG: dump consumer: " << ci->getName());
+    using namespace message;
+    shadowSession.messageSubscribe(
+        arg::queue       = ci->getQueue()->getName(),
+        arg::destination = ci->getName(),
+        arg::acceptMode  = ci->isAckExpected() ? ACCEPT_MODE_EXPLICIT : 
ACCEPT_MODE_NONE,
+        arg::acquireMode = ci->isAcquire() ? ACQUIRE_MODE_PRE_ACQUIRED : 
ACQUIRE_MODE_NOT_ACQUIRED,
+        arg::exclusive   = false ,  // FIXME aconway 2008-09-23: how to read.
+
+        // TODO aconway 2008-09-23: remaining args not used by current broker.
+        // Update this code when they are.
+        arg::resumeId=std::string(), 
+        arg::resumeTtl=0,
+        arg::arguments=FieldTable()
+    );
+    shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? 
FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
+    shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, 
ci->getMsgCredit());
+    shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, 
ci->getByteCredit());
+    // FIXME aconway 2008-09-23: need to replicate ConsumerImpl::blocked and 
notifyEnabled?
+    QPID_LOG(debug, donor.getId() << " dumped consumer " << ci->getName() << " 
on " << shadowSession.getId());
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Wed Sep 24 
10:34:08 2008
@@ -31,17 +31,18 @@
 
 using framing::Buffer;
 
-const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t);
+const size_t Event::OVERHEAD = sizeof(uint8_t) + sizeof(uint64_t) + 
sizeof(size_t);
 
-Event::Event(EventType t, const ConnectionId c, const size_t s)
-    : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)) {}
+Event::Event(EventType t, const ConnectionId& c,  size_t s, size_t i)
+    : type(t), connectionId(c), size(s), data(RefCountedBuffer::create(s)), 
id(i) {}
 
 Event Event::delivered(const MemberId& m, void* d, size_t s) {
     Buffer buf(static_cast<char*>(d), s);
     EventType type((EventType)buf.getOctet()); 
     ConnectionId connection(m, 
reinterpret_cast<Connection*>(buf.getLongLong()));
+    size_t id = buf.getLong();
     assert(buf.getPosition() == OVERHEAD);
-    Event e(type, connection, s-OVERHEAD);
+    Event e(type, connection, s-OVERHEAD, id);
     memcpy(e.getData(), static_cast<char*>(d)+OVERHEAD, s-OVERHEAD);
     return e;
 }
@@ -51,6 +52,7 @@
     Buffer b(header, OVERHEAD);
     b.putOctet(type);
     b.putLongLong(reinterpret_cast<uint64_t>(connectionId.getConnectionPtr()));
+    b.putLong(id);
     iovec iov[] = { { header, OVERHEAD }, { const_cast<char*>(getData()), 
getSize() } };
     cpg.mcast(name, iov, sizeof(iov)/sizeof(*iov));
 }
@@ -60,13 +62,12 @@
 }
 
 static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
+
 std::ostream& operator << (std::ostream& o, const Event& e) {
-    o << "[event: " << e.getConnectionId()
+    o << "[event " << e.getConnectionId() << "/" << e.getId()
       << " " << EVENT_TYPE_NAMES[e.getType()]
-      << " " << e.getSize() << " bytes: ";
-    std::ostream_iterator<char> oi(o,"");
-    std::copy(e.getData(), e.getData()+std::min(e.getSize(), size_t(16)), oi);
-    return o << "...]";
+      << " " << e.getSize() << " bytes]";
+    return o;
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Wed Sep 24 10:34:08 
2008
@@ -43,7 +43,7 @@
 class Event {
   public:
     /** Create an event to mcast with a buffer of size bytes. */
-    Event(EventType t=DATA, const ConnectionId c=ConnectionId(), size_t 
size=0);
+    Event(EventType t=DATA, const ConnectionId& c=ConnectionId(), size_t 
size=0, size_t id=0);
 
     /** Create an event copied from delivered data. */
     static Event delivered(const MemberId& m, void* data, size_t size);
@@ -55,6 +55,7 @@
     size_t getSize() const { return size; }
     char* getData() { return data; }
     const char* getData() const { return data; }
+    size_t getId() const { return id; }
 
     operator framing::Buffer() const;
 
@@ -64,6 +65,7 @@
     ConnectionId connectionId;
     size_t size;
     RefCountedBuffer::pointer data;
+    size_t id;
 };
 
 std::ostream& operator << (std::ostream&, const Event&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Wed Sep 
24 10:34:08 2008
@@ -46,9 +46,10 @@
 
 void JoiningHandler::deliver(Event& e) {
     // Discard connection events unless we are stalled to receive a  dump.
-    if (state == STALLED) {
+    if (state == STALLED) 
         cluster.connectionEventQueue.push(e);
-    }
+    else
+        QPID_LOG(trace, "Discarded pre-join event  " << e);
 }
 
 void JoiningHandler::update(const MemberId&, const framing::FieldTable& 
members, uint64_t dumper) {
@@ -80,12 +81,13 @@
                 assert(0); break;
 
               case DUMP_REQUESTED: 
-                QPID_LOG(info, cluster.self << " stalling for dump from " << 
cluster.map.dumper);
+                QPID_LOG(debug, cluster.self << " stalling for dump from " << 
cluster.map.dumper);
                 state = STALLED;
                 cluster.stall();
                 break;
 
               case DUMP_COMPLETE:
+                QPID_LOG(debug, cluster.self << " at start point and dump 
complete, ready.");
                 cluster.ready();
                 break;
             }
@@ -107,8 +109,8 @@
 }
 
 void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
-    QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << 
catchUpConnections-1);
-    if (c->isShadow())
+    QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: 
" << catchUpConnections-1);
+    if (c->isShadow()) 
         
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
     if (--catchUpConnections == 0)
         dumpComplete();
@@ -118,10 +120,11 @@
     // FIXME aconway 2008-09-18: need to detect incomplete dump.
     // 
     if (state == STALLED) {
+        QPID_LOG(debug, cluster.self << " received dump and stalled at start 
point, unstalling.");
         cluster.ready();
     }
     else {
-        QPID_LOG(debug, "Dump complete, waiting for stall point.");
+        QPID_LOG(debug, cluster.self << " received dump, waiting for start 
point.");
         assert(state == DUMP_REQUESTED);
         state = DUMP_COMPLETE;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Wed Sep 24 
10:34:08 2008
@@ -34,7 +34,8 @@
 MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
 
 MemberHandler::~MemberHandler() { 
-    if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+    if (dumpThread.id()) 
+        dumpThread.join(); // Join the last dumpthread.
 }
 
 void MemberHandler::configChange(
@@ -62,7 +63,8 @@
     assert(!cluster.connectionEventQueue.isStopped()); // Not currently 
stalled.
     cluster.stall();
 
-    if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
+    if (dumpThread.id()) 
+        dumpThread.join(); // Join the previous dumpthread.
     dumpThread = Thread(new DumpClient(Url(urlStr), cluster,
                             boost::bind(&MemberHandler::dumpSent, this),
                             boost::bind(&MemberHandler::dumpError, this, _1)));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Wed 
Sep 24 10:34:08 2008
@@ -45,10 +45,12 @@
 
 void OutputInterceptor::activateOutput() {
     Locker l(lock);
+
     if (parent.isCatchUp())
         next->activateOutput();
     else {
         moreOutput = true;
+        QPID_LOG(trace,  &parent << " activateOutput - sending doOutput");
         sendDoOutput();
     }
 }
@@ -79,15 +81,19 @@
 
     QPID_LOG(trace, "Delivered doOutput: requested=" << requested << " 
output=" << sent << " more=" << moreOutput);
 
-    if (parent.isLocal() && moreOutput) 
+    if (parent.isLocal() && moreOutput)  {
+        QPID_LOG(trace,  &parent << " deliverDoOutput - sending doOutput, more 
output available.");
         sendDoOutput();
+    }
     else
         doingOutput = false;
 }
 
 void OutputInterceptor::startDoOutput() {
-    if (!doingOutput) 
+    if (!doingOutput)  {
+        QPID_LOG(trace,  &parent << " startDoOutput - sending doOutput, more 
output available.");
         sendDoOutput();
+    }
 }
 
 // Send a doOutput request if one is not already in flight.
@@ -111,4 +117,14 @@
     next = &h;
 }
 
+void OutputInterceptor::close() {
+    Locker l(lock);
+    next->close();
+}
+
+size_t OutputInterceptor::getBuffered() const {
+    Locker l(lock);
+    return next->getBuffered();
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Wed Sep 
24 10:34:08 2008
@@ -43,8 +43,8 @@
     // sys::ConnectionOutputHandler functions
     void send(framing::AMQFrame& f);
     void activateOutput();
-    void close() { Locker l(lock); next->close(); }
-    size_t getBuffered() const { Locker l(lock); return next->getBuffered(); }
+    void close();
+    size_t getBuffered() const;
 
     // Delivery point for doOutput requests.
     void deliverDoOutput(size_t requested);

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/ConnectionOutputHandlerPtr.h Wed 
Sep 24 10:34:08 2008
@@ -36,8 +36,8 @@
 class ConnectionOutputHandlerPtr : public ConnectionOutputHandler
 {
   public:
-    ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) {}
-    void set(ConnectionOutputHandler* p) { next = p; }
+    ConnectionOutputHandlerPtr(ConnectionOutputHandler* p) : next(p) { 
assert(next); }
+    void set(ConnectionOutputHandler* p) { next = p; assert(next); }
     ConnectionOutputHandler* get() { return next; }
     const ConnectionOutputHandler* get() const { return next; }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Sep 24 
10:34:08 2008
@@ -98,7 +98,7 @@
 
 ClusterFixture::ClusterFixture(size_t n, bool init0_) : 
name(Uuid(true).str()), init0(init0_) {
     add(n);
-    if (!init0) return;  // FIXME aconway 2008-09-18: can't use local hack in 
this case.
+    if (!init0) return;  // Defer initialization of broker0
     // Wait for all n members to join the cluster
     waitFor(n);
     BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
@@ -164,36 +164,35 @@
     return o;
 }
 
-#if 0                           // FIXME aconway 2008-09-22: enable.
 QPID_AUTO_TEST_CASE(DumpConsumers) {
-    ClusterFixture cluster(1);
-    Client c0(cluster[0]);
+    ClusterFixture cluster(1); 
+    Client c0(cluster[0], "c0"); 
     c0.session.queueDeclare("q");
-    c0.subs.subscribe(c0.lq, "q");
-    c0.session.messageTransfer(arg::content=Message("before", "q"));
-    Message m;
-    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "before");
+    c0.subs.subscribe(c0.lq, "q", FlowControl::zero());
+    c0.session.sync();
 
-    // Start new member
+    // Start new members
     cluster.add();
-    Client c1(cluster[1]);
+    Client c1(cluster[1], "c1"); 
+    cluster.add();
+    Client c2(cluster[2], "c2"); 
 
-    // Transfer some messages to the subscription by client c0.
+    // Transfer a message, verify all members see it.
     c0.session.messageTransfer(arg::content=Message("aaa", "q"));
-    BOOST_CHECK(c0.lq.get(m, TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "aaa");
+    BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 1u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 1u);
 
-    c1.session.messageTransfer(arg::content=Message("bbb", "q"));
+    // Activate the subscription, ensure message removed on all queues. 
+    c0.subs.setFlowControl("q", FlowControl::messageCredit(1));
+    Message m;
     BOOST_CHECK(c0.lq.get(m, TIME_SEC));
-    BOOST_CHECK_EQUAL(m.getData(), "bbb");
+    BOOST_CHECK_EQUAL(m.getData(), "aaa");
 
-    // Verify that the queue has been drained on both brokers.
-    // This proves that the consumer was replicated when the second broker 
joined.
     BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("q").getMessageCount(), 0u);
 }
-#endif
 
 QPID_AUTO_TEST_CASE(testCatchupSharedState) {
     ClusterFixture cluster(1);
@@ -217,7 +216,7 @@
     cluster.waitFor(2);
     c0.session.messageTransfer(arg::content=Message("pbar","p"));
 
-    // Verify new brokers have all state.
+    // Verify new brokers have state.
     Message m;
 
     Client c1(cluster[1], "c1");
@@ -228,11 +227,14 @@
     BOOST_CHECK_EQUAL(m.getData(), "bar");
     BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
 
-    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+    // Add & verify another broker.
+    cluster.add();
+    Client c2(cluster[2], "c2");
+    BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "pfoo");
-    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK(c2.subs.get(m, "p", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "pbar");
-    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u);
+    BOOST_CHECK_EQUAL(c2.session.queueQuery("p").getMessageCount(), 0u);
 }
 
 QPID_AUTO_TEST_CASE(testWiringReplication) {
@@ -333,7 +335,6 @@
     c1.session.messageTransfer(arg::content=Message("foo","q"));
     while (c1.session.queueQuery("q").getMessageCount() != 1)
         ::usleep(1000);         // Wait for message to show up on broker 1.
-    sleep(2);               // FIXME aconway 2008-09-11: remove.
     // But it should not be on broker 0.
     boost::shared_ptr<broker::Queue> q0 = 
cluster.broker0->broker->getQueues().find("q");
     BOOST_REQUIRE(q0);

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=698666&r1=698665&r2=698666&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Sep 24 10:34:08 2008
@@ -67,10 +67,11 @@
       <!-- Target session deduced from channel number.  -->
 
       <field name="replay-start" type="sequence-no"/>         <!-- Replay 
frames will start from this point.-->
+      <field name="command-point" type="sequence-no"/>        <!-- Id of next 
command sent -->
       <field name="sent-incomplete" type="sequence-set"/>      <!-- Commands 
sent and incomplete. -->
 
-      <field name="expected" type="sequence-no"/>             <!-- Idempotence 
barrier -->
-      <field name="received" type="sequence-no"/>             <!-- Received up 
to here > expected-->
+      <field name="expected" type="sequence-no"/>             <!-- Next 
command expected. -->
+      <field name="received" type="sequence-no"/>             <!-- Received up 
to here (>= expected) -->
       <field name="unknown-completed" type="sequence-set"/>    <!-- Completed 
but not known to peer. -->
       <field name="received-incomplete" type="sequence-set"/>  <!-- Received 
and incomplete -->
     </control>


Reply via email to