Author: aconway
Date: Sat Sep 20 22:04:04 2008
New Revision: 697446

URL: http://svn.apache.org/viewvc?rev=697446&view=rev
Log:

DumpClient send connections & session IDs to new members.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.cpp Sat Sep 20 22:04:04 
2008
@@ -28,7 +28,7 @@
 namespace qpid {
 
 Exception::Exception(const std::string& msg) throw() : message(msg) {
-    QPID_LOG(debug, "Exception constructed: " << message);
+    QPID_LOG_IF(debug, !msg.empty(), "Exception constructed: " << message);
 }
 
 Exception::~Exception() throw() {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Sat Sep 20 
22:04:04 2008
@@ -165,6 +165,12 @@
     getOutput().close();
 }
 
+// Send a close to the client but keep the channels. Used by cluster.
+void Connection::sendClose() {
+    adapter.close(200, "OK", 0, 0);
+    getOutput().close();
+}
+
 void Connection::idleOut(){}
 
 void Connection::idleIn(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Sat Sep 20 
22:04:04 2008
@@ -28,25 +28,29 @@
 
 #include <boost/ptr_container/ptr_map.hpp>
 
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_ServerOperations.h"
-#include "qpid/framing/AMQP_ClientProxy.h"
-#include "qpid/sys/AggregateOutput.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/TimeoutHandler.h"
-#include "qpid/framing/ProtocolVersion.h"
 #include "Broker.h"
-#include "qpid/sys/Socket.h"
-#include "qpid/Exception.h"
 #include "ConnectionHandler.h"
 #include "ConnectionState.h"
 #include "SessionHandler.h"
-#include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Connection.h"
+#include "qpid/Exception.h"
 #include "qpid/RefCounted.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/management/Manageable.h"
+#include "qpid/ptr_map.h"
+#include "qpid/sys/AggregateOutput.h"
+#include "qpid/sys/ConnectionInputHandler.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Socket.h"
+#include "qpid/sys/TimeoutHandler.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
+#include <boost/bind.hpp>
+
+#include <algorithm>
 
 namespace qpid {
 namespace broker {
@@ -93,6 +97,13 @@
     void notifyConnectionForced(const std::string& text);
     void setUserId(const string& uid);
 
+    template <class F> void eachSessionHandler(const F& f) {
+        for (ChannelMap::iterator i = channels.begin(); i != channels.end(); 
++i)
+            f(*ptr_map_ptr(i));
+    }
+
+    void sendClose();
+    
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Sat Sep 20 
22:04:04 2008
@@ -74,7 +74,7 @@
     {
         Lock l(state);
         if (state != DETACHED) {
-            QPID_LOG(error, "Session was not closed cleanly");
+            QPID_LOG(warning, "Session was not closed cleanly");
             setState(DETACHED);
             handleClosed();
             state.waitWaiters();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Sat Sep 20 
22:04:04 2008
@@ -92,6 +92,11 @@
     handler->insert(c);
 }
 
+void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+    Mutex::ScopedLock l(lock);
+    handler->catchUpClosed(c);
+}
+
 void Cluster::erase(ConnectionId id) {
     Mutex::ScopedLock l(lock);
     connections.erase(id);
@@ -119,6 +124,7 @@
 }
 
 void Cluster::mcastEvent(const Event& e) {
+    QPID_LOG(trace, "MCAST " << e);
     e.mcast(name, cpg);
 }
 
@@ -170,8 +176,10 @@
                     throw Exception(QPID_MSG("Invalid cluster control"));
             }
         }
-        else 
+        else {
+            QPID_LOG(trace, "DLVR" << (connectionEventQueue.isStopped() ? 
"(stalled)" : "") << " " << e);
             handler->deliver(e);
+        }
     }
     catch (const std::exception& e) {
         QPID_LOG(critical, "Error in cluster deliver: " << e.what());
@@ -181,14 +189,17 @@
 
 void Cluster::connectionEvent(const Event& e) {
     Buffer buf(e);
-    assert(e.getConnection());
-    if (e.getType() == DATA)
-        e.getConnection()->deliverBuffer(buf);
+    QPID_LOG(trace, "EXEC: " << e);
+    boost::intrusive_ptr<Connection> connection = 
getConnection(e.getConnectionId()); 
+    assert(connection);
+    if (e.getType() == DATA) {
+        connection->deliverBuffer(buf);
+    }
     else {              // control
         AMQFrame frame;
         while (frame.decode(buf)) {
-            QPID_LOG(trace, "DLVR [" << self << "]: " << frame);
-            e.getConnection()->received(frame);
+            QPID_LOG(trace, "EXEC [" << *connection << "]: " << frame);
+            connection->received(frame);
         }
     }
 }
@@ -196,26 +207,28 @@
 struct AddrList {
     const cpg_address* addrs;
     int count;
-    const char* prefix;
-    AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), 
count(n), prefix(p) {}
+    const char *prefix, *suffix;
+    AddrList(const cpg_address* a, int n, const char* p="", const char* s="")
+        : addrs(a), count(n), prefix(p), suffix(s) {}
 };
 
 ostream& operator<<(ostream& o, const AddrList& a) {
-    if (a.count && a.prefix) o << a.prefix;
+    if (!a.count) return o;
+    o << a.prefix;
     for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
         const char* reasonString;
         switch (p->reason) {
-          case CPG_REASON_JOIN: reasonString =  " joined"; break;
-          case CPG_REASON_LEAVE: reasonString =  " left";break;
-          case CPG_REASON_NODEDOWN: reasonString =  " node-down";break;
-          case CPG_REASON_NODEUP: reasonString =  " node-up";break;
-          case CPG_REASON_PROCDOWN: reasonString =  " process-down";break;
+          case CPG_REASON_JOIN: reasonString =  " joined "; break;
+          case CPG_REASON_LEAVE: reasonString =  " left "; break;
+          case CPG_REASON_NODEDOWN: reasonString =  " node-down "; break;
+          case CPG_REASON_NODEUP: reasonString =  " node-up "; break;
+          case CPG_REASON_PROCDOWN: reasonString =  " process-down "; break;
           default: reasonString = " ";
         }
         qpid::cluster::MemberId member(*p);
-        o << member << reasonString << ((p+1 < a.addrs+a.count) ? ", " : "");
+        o << member << reasonString;
     }
-    return o;
+    return o << a.suffix;
 }
 
 void Cluster::dispatch(sys::DispatchHandle& h) {
@@ -238,8 +251,8 @@
     cpg_address *joined, int nJoined)
 {
     Mutex::ScopedLock l(lock);
-    QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". "
-             << AddrList(left, nLeft, "Left: "));
+    QPID_LOG(debug, "CPG members: " << AddrList(current, nCurrent) 
+             << AddrList(left, nLeft, "( ", ")"));
     
     if (find(left, left+nLeft, self) != left+nLeft) { 
         // I have left the group, this is the final config change.
@@ -289,9 +302,14 @@
 }
 
 void Cluster::ready() {
-    // Called with lock held
-    QPID_LOG(info, self << " ready at URL " << url);
+    QPID_LOG(debug, self << " ready at " << url);
+    unstall();
     mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+}
+
+void Cluster::unstall() {
+    // Called with lock held
+    QPID_LOG(debug, self << " un-stalling");
     handler = &memberHandler;   // Member mode.
     connectionEventQueue.start(poller);
     //     if (mgmtObject!=0)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Sat Sep 20 
22:04:04 2008
@@ -64,6 +64,8 @@
 
     void insert(const boost::intrusive_ptr<Connection>&); // Insert a local 
connection
     void erase(ConnectionId);          // Erase a connection.
+
+    void catchUpClosed(const boost::intrusive_ptr<Connection>&); // Insert a 
local connection
     
     /** Get the URLs of current cluster members. */
     std::vector<Url> getUrls() const;
@@ -88,8 +90,9 @@
 
     MemberId getSelf() const { return self; }
 
-    void stall();
     void ready();
+    void stall();
+    void unstall();
 
     void shutdown();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Sat Sep 20 
22:04:04 2008
@@ -59,6 +59,7 @@
                               cpg_address *joined, int nJoined) = 0;
 
     virtual void insert(const boost::intrusive_ptr<Connection>& c) = 0;
+    virtual void catchUpClosed(const boost::intrusive_ptr<Connection>& c) = 0;
 
   protected:
     Cluster& cluster;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Sat Sep 20 
22:04:04 2008
@@ -21,6 +21,7 @@
 #include "ClusterMap.h"
 #include "qpid/Url.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
 #include <boost/bind.hpp>
 #include <algorithm>
 #include <functional>
@@ -86,6 +87,7 @@
     members[id] = url;
     if (id == dumper)
         dumper = MemberId();
+    QPID_LOG(info, id << " joined cluster: " << *this);
 }
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Sat Sep 20 
22:04:04 2008
@@ -31,18 +31,23 @@
 
 using namespace framing;
 
+// Shadow connections
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, ConnectionId myId)
-    : cluster(c), self(myId), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId), catchUp(), 
exCatchUp()
-{}
+    : cluster(c), self(myId), catchUp(false), output(*this, out),
+      connection(&output, cluster.getBroker(), wrappedId)
+{
+    QPID_LOG(debug, "New connection: " << *this);
+}
 
+// Local connections
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, MemberId myId, bool 
isCatchUp)
-    : cluster(c), self(myId, this), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId),
-      catchUp(isCatchUp), exCatchUp()
-{}
+    : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
+      connection(&output, cluster.getBroker(), wrappedId)
+{
+    QPID_LOG(debug, "New connection: " << *this);
+}
 
 Connection::~Connection() {}
 
@@ -59,6 +64,7 @@
 }
 
 void Connection::received(framing::AMQFrame& f) {
+    QPID_LOG(trace, "EXEC [" << *this << "]: " << f);
     // Handle connection controls, deliver other frames to connection.
     if (!framing::invoke(*this, *f.getBody()).wasHandled())
         connection.received(f);
@@ -70,16 +76,10 @@
         // connection around but replace the output handler with a
         // no-op handler as the network output handler will be
         // deleted.
-
-        // FIXME aconway 2008-09-18: output handler reset in right place?
-        // connection.setOutputHandler(&discardHandler);
         output.setOutputHandler(discardHandler);
         if (catchUp) {
-            // This was a catch-up connection, may be promoted to a
-            // shadow connection.
             catchUp = false;
-            exCatchUp = true;
-            cluster.insert(boost::intrusive_ptr<Connection>(this));
+            cluster.catchUpClosed(boost::intrusive_ptr<Connection>(this));
         }
         else {
             // This was a local replicated connection. Multicast a deliver 
closed
@@ -125,8 +125,11 @@
     // FIXME aconway 2008-09-10: TODO
 }
     
-void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/) 
{
-    // FIXME aconway 2008-09-10: TODO
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId) {
+    ConnectionId shadow = ConnectionId(memberId, connectionId);
+    QPID_LOG(debug, "Catch-up connection " << self << " becomes shadow " << 
shadow);
+    self = shadow;
+    assert(isShadow());
 }
 
 void Connection::dumpComplete() {
@@ -134,6 +137,11 @@
 }
 
 bool Connection::isLocal() const { return self.first == cluster.getSelf() && 
self.second == this; }
- 
+
+std::ostream& operator<<(std::ostream& o, const Connection& c) {
+    return o << c.getId() << "(" << (c.isLocal() ? "local" : "shadow")
+             << (c.isCatchUp() ? ",catchup" : "") << ")";
+}
+
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Sat Sep 20 
22:04:04 2008
@@ -34,6 +34,8 @@
 #include "qpid/framing/FrameDecoder.h"
 #include "qpid/framing/SequenceNumber.h"
 
+#include <iosfwd>
+
 namespace qpid {
 
 namespace framing { class AMQFrame; }
@@ -59,11 +61,10 @@
     ConnectionId getId() const { return self; }
     broker::Connection& getBrokerConnection() { return connection; }
     bool isLocal() const;
+    bool isShadow() const { return !isLocal(); }
 
     /** True if the connection is in "catch-up" mode: building initial state */
     bool isCatchUp() const { return catchUp; }
-    bool isExCatchUp() const { return exCatchUp; }
-
 
     Cluster& getCluster() { return cluster; }
 
@@ -109,6 +110,7 @@
 
     Cluster& cluster;
     ConnectionId self;
+    bool catchUp;
     NoOpConnectionOutputHandler discardHandler;
     WriteEstimate writeEstimate;
     OutputInterceptor output;
@@ -116,8 +118,8 @@
     broker::Connection connection;
     framing::SequenceNumber mcastSeq;
     framing::SequenceNumber deliverSeq;
-    bool catchUp;
-    bool exCatchUp;
+
+  friend std::ostream& operator<<(std::ostream&, const Connection&);
 };
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Sat Sep 
20 22:04:04 2008
@@ -25,6 +25,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/log/Statement.h"
 #include "qpid/memory.h"
+#include <stdexcept>
 
 namespace qpid {
 namespace cluster {
@@ -57,8 +58,18 @@
 
 // ConnectionCodec functions delegate to the codecOutput
 size_t ConnectionCodec::decode(const char* buffer, size_t size) {
-    if (interceptor->isCatchUp())
-        return codec.decode(buffer, size);
+    if (interceptor->isShadow())
+        throw Exception(QPID_MSG("Unexpected decode for shadow connection " << 
*interceptor));
+    else if (interceptor->isCatchUp())  {
+        size_t ret = codec.decode(buffer, size);
+        if (interceptor->isShadow()) {
+            // Promoted to shadow, close the codec.
+            // FIXME aconway 2008-09-19: can we close cleanly?
+            // codec.close();
+            throw Exception("Close codec");
+        }
+        return ret;
+    }
     else
         return interceptor->decode(buffer, size);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Sat Sep 20 
22:04:04 2008
@@ -19,6 +19,8 @@
  *
  */
 #include "DumpClient.h"
+#include "Cluster.h"
+#include "Connection.h"
 #include "qpid/client/SessionBase_0_10Access.h" 
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Queue.h"
@@ -26,8 +28,11 @@
 #include "qpid/broker/Message.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/broker/ExchangeRegistry.h"
+#include "qpid/broker/SessionHandler.h"
+#include "qpid/broker/SessionState.h"
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/framing/ClusterConnectionDumpCompleteBody.h"
+#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
 #include "qpid/framing/enum.h"
 #include "qpid/framing/ProtocolVersion.h"
 #include "qpid/log/Statement.h"
@@ -39,6 +44,7 @@
 namespace client {
 struct ConnectionAccess {
     static void setVersion(Connection& c, const framing::ProtocolVersion& v) { 
c.version = v; }
+    static boost::shared_ptr<ConnectionImpl>  getImpl(Connection& c) { return 
c.impl; }
 };
 } // namespace client
 
@@ -50,17 +56,30 @@
 using broker::QueueBinding;
 using broker::Message;
 using namespace framing;
-using namespace framing::message;
-using namespace client;
+namespace arg=client::arg;
+using client::SessionBase_0_10Access;
 
+// Create a connection with special version that marks it as a catch-up 
connection.
+client::Connection catchUpConnection() {
+    client::Connection c;
+    client::ConnectionAccess::setVersion(c, ProtocolVersion(0x80 , 0x80 + 10));
+    return c;
+}
+
+// Send a control body directly to the session.
+void send(client::Session& s, const AMQBody& body) {
+    client::SessionBase_0_10Access sb(s);
+    sb.get()->send(body);
+}
 
-DumpClient::DumpClient(const Url& url, Broker& b,
+DumpClient::DumpClient(const Url& url, Cluster& c,
                        const boost::function<void()>& ok,
                        const boost::function<void(const std::exception&)>& 
fail)
-    : donor(b), done(ok), failed(fail)
+    : receiver(url), donor(c), 
+      connection(catchUpConnection()), shadowConnection(catchUpConnection()),
+      done(ok), failed(fail)
 {
-    // Special version identifies this as a catch-up connectionn.
-    client::ConnectionAccess::setVersion(connection, ProtocolVersion(0x80 , 
0x80 + 10));
+    QPID_LOG(debug, "DumpClient from " << c.getSelf() << " to " << url);
     connection.open(url);
     session = connection.newSession();
 }
@@ -72,15 +91,18 @@
 static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS)); 
 
 void DumpClient::dump() {
-    donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, 
this, _1));
+    Broker& b = donor.getBroker();
+    b.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, 
_1));
     // Catch-up exchange is used to route messages to the proper queue without 
modifying routing key.
     session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", 
arg::autoDelete=true);
-    donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
-    SessionBase_0_10Access sb(session);
-    // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
+    b.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
     session.sync();
     session.close();
+    donor.eachConnection(boost::bind(&DumpClient::dumpConnection, this, _1));
+    QPID_LOG(debug, "Dump sent, closing catch_up connection.");
+    // FIXME aconway 2008-09-18: inidicate successful end-of-dump.
     connection.close();
+    QPID_LOG(debug, "Dump sent.");
 }
 
 void DumpClient::run() {
@@ -121,7 +143,8 @@
 
 void DumpClient::dumpMessage(const broker::QueuedMessage& message) {
     SessionBase_0_10Access sb(session);
-    framing::MessageTransferBody transfer(framing::ProtocolVersion(), 
CATCH_UP, ACCEPT_MODE_NONE, ACQUIRE_MODE_PRE_ACQUIRED);
+    framing::MessageTransferBody transfer(
+        framing::ProtocolVersion(), CATCH_UP, message::ACCEPT_MODE_NONE, 
message::ACQUIRE_MODE_PRE_ACQUIRED);
     sb.get()->send(transfer, message.payload->getFrames());
 }
 
@@ -129,5 +152,42 @@
     session.exchangeBind(queue, binding.exchange, binding.key, binding.args);
 }
 
+void DumpClient::dumpConnection(const boost::intrusive_ptr<Connection>& 
dumpConnection) {
+    QPID_LOG(debug, "Dump connection " << *dumpConnection);
+
+    shadowConnection = catchUpConnection();
+    // FIXME aconway 2008-09-19: Open with settings from dumpConnection - 
userid etc.
+    shadowConnection.open(receiver);
+    
dumpConnection->getBrokerConnection().eachSessionHandler(boost::bind(&DumpClient::dumpSession,
 this, _1));
+    boost::shared_ptr<client::ConnectionImpl> impl = 
client::ConnectionAccess::getImpl(shadowConnection);
+    // FIXME aconway 2008-09-19: use proxy for cluster commands? 
+    AMQFrame 
ready(in_place<ClusterConnectionShadowReadyBody>(ProtocolVersion(),
+                       dumpConnection->getId().getMember(),
+                       
reinterpret_cast<uint64_t>(dumpConnection->getId().getConnectionPtr())));
+    impl->handle(ready);
+    // Will be closed from the other end.
+    QPID_LOG(debug, "Dump done, connection " << *dumpConnection);
+}
+
+void DumpClient::dumpSession(broker::SessionHandler& sh) {
+    QPID_LOG(debug, "Dump session " << &sh.getConnection()  << "[" << 
sh.getChannel() << "] "
+             << sh.getSession()->getId());
+
+    broker::SessionState* s = sh.getSession();
+    if (!s) return;         // no session.
+    // Re-create the session.
+    boost::shared_ptr<client::ConnectionImpl> cimpl = 
client::ConnectionAccess::getImpl(shadowConnection);
+    size_t max_frame_size = cimpl->getNegotiatedSettings().maxFrameSize;
+    // FIXME aconway 2008-09-19: verify matching ID.
+    boost::shared_ptr<client::SessionImpl> simpl(
+        new client::SessionImpl(s->getId().getName(), cimpl, sh.getChannel(), 
max_frame_size));
+    cimpl->addSession(simpl);
+    simpl->open(0);
+    client::Session cs;
+    client::SessionBase_0_10Access(cs).set(simpl);
+    cs.sync();
+    // FIXME aconway 2008-09-19: remaining session state.
+    QPID_LOG(debug, "Dump done, session " << sh.getSession()->getId());
+}
 
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Sat Sep 20 
22:04:04 2008
@@ -24,11 +24,6 @@
 
 #include "qpid/client/Connection.h"
 #include "qpid/client/AsyncSession.h"
-#include "qpid/broker/Message.h"
-#include "qpid/broker/Queue.h"
-#include "qpid/broker/Exchange.h"
-#include "qpid/broker/QueueRegistry.h"
-#include "qpid/broker/ExchangeRegistry.h"
 #include "qpid/sys/Runnable.h"
 #include <boost/shared_ptr.hpp>
 
@@ -45,16 +40,21 @@
 class QueueBindings;
 class QueueBinding;
 class QueuedMessage;
+class SessionHandler;
+
 } // namespace broker
 
 namespace cluster {
 
+class Cluster;
+class Connection;
+
 /**
  * A client that dumps the contents of a local broker to a remote one using 
AMQP.
  */
 class DumpClient : public sys::Runnable {
   public:
-    DumpClient(const Url& url, broker::Broker& donor,
+    DumpClient(const Url& receiver, Cluster& donor,
                const boost::function<void()>& done,
                const boost::function<void(const std::exception&)>& fail);
 
@@ -67,11 +67,14 @@
     void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
     void dumpMessage(const broker::QueuedMessage&);
     void dumpBinding(const std::string& queue, const broker::QueueBinding& 
binding);
+    void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
+    void dumpSession(broker::SessionHandler& s);
 
   private:
-    client::Connection connection;
-    client::AsyncSession session;
-    broker::Broker& donor;
+    Url receiver;
+    Cluster& donor;
+    client::Connection connection, shadowConnection;
+    client::AsyncSession session, shadowSession;
     boost::function<void()> done;
     boost::function<void(const std::exception& e)> failed;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Sat Sep 20 22:04:04 
2008
@@ -56,16 +56,12 @@
     char* getData() { return data; }
     const char* getData() const { return data; }
 
-    boost::intrusive_ptr<Connection> getConnection() const { return 
connection; }
-    void setConnection(const boost::intrusive_ptr<Connection>& c) { 
connection=c; }
-
     operator framing::Buffer() const;
 
   private:
     static const size_t OVERHEAD;
     EventType type;
     ConnectionId connectionId;
-    boost::intrusive_ptr<Connection> connection;
     size_t size;
     RefCountedBuffer::pointer data;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Sat Sep 
20 22:04:04 2008
@@ -40,14 +40,13 @@
     if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in 
cluster.
         QPID_LOG(notice, cluster.self << " first in cluster.");
         cluster.map.ready(cluster.self, cluster.url);
-        cluster.ready();
+        cluster.unstall();
     }
 }
 
 void JoiningHandler::deliver(Event& e) {
-    // Discard connection events unless we are stalled and getting a dump.
+    // Discard connection events unless we are stalled to receive a  dump.
     if (state == STALLED) {
-        e.setConnection(cluster.getConnection(e.getConnectionId()));
         cluster.connectionEventQueue.push(e);
     }
 }
@@ -73,6 +72,7 @@
     }
     else {                      // Start a new dump
         cluster.map.dumper = cluster.map.first();
+        QPID_LOG(debug, "Starting dump, dumper=" << cluster.map.dumper <<  " 
dumpee=" << dumpee);
         if (dumpee == cluster.self) { // My turn
             switch (state) {
               case START:
@@ -101,24 +101,23 @@
 void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
     if (c->isCatchUp()) {
         ++catchUpConnections;
-        QPID_LOG(debug, "Received " << catchUpConnections << " catch-up 
connections.");
-    }
-    else if (c->isExCatchUp()) {
-        if (c->getId().getConnectionPtr() != c.get()) // become shadow 
connection
-            
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
-        QPID_LOG(debug, "Catch-up connection terminated " << 
catchUpConnections-1 << " remaining");
-        if (--catchUpConnections == 0)
-            dumpComplete();
+        QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << 
catchUpConnections);
     }
-    else      // Local connection, will be stalled till dump complete.
+    cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), 
c));
+}
+
+void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+    QPID_LOG(debug, "Catch-up connection " << *c << " finished, remaining " << 
catchUpConnections-1);
+    if (c->isShadow())
         
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
+    if (--catchUpConnections == 0)
+        dumpComplete();
 }
 
 void JoiningHandler::dumpComplete() {
     // FIXME aconway 2008-09-18: need to detect incomplete dump.
     // 
     if (state == STALLED) {
-        QPID_LOG(debug, "Dump complete, unstalling.");
         cluster.ready();
     }
     else {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h Sat Sep 20 
22:04:04 2008
@@ -47,6 +47,7 @@
     void ready(const MemberId&, const std::string& url);
 
     void insert(const boost::intrusive_ptr<Connection>& c);
+    void catchUpClosed(const boost::intrusive_ptr<Connection>& c);
     
   private:
     void checkDumpRequest();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Sat Sep 20 
22:04:04 2008
@@ -47,7 +47,6 @@
 }
 
 void MemberHandler::deliver(Event& e) {
-    e.setConnection(cluster.getConnection(e.getConnectionId())); 
     cluster.connectionEventQueue.push(e);
 }
 
@@ -64,7 +63,7 @@
     cluster.stall();
 
     if (dumpThread.id()) dumpThread.join(); // Join the last dumpthread.
-    dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+    dumpThread = Thread(new DumpClient(Url(urlStr), cluster,
                             boost::bind(&MemberHandler::dumpSent, this),
                             boost::bind(&MemberHandler::dumpError, this, _1)));
 }
@@ -92,4 +91,9 @@
         cluster.connections[c->getId()] = c;
 }
 
+void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+    QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
+    assert(0);
+}
+
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Sat Sep 20 
22:04:04 2008
@@ -53,6 +53,7 @@
     void dumpError(const std::exception&);
 
     void insert(const boost::intrusive_ptr<Connection>& c);
+    void catchUpClosed(const boost::intrusive_ptr<Connection>& );
 
   public:
     sys::Thread dumpThread;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Sat 
Sep 20 22:04:04 2008
@@ -39,13 +39,18 @@
 void OutputInterceptor::send(framing::AMQFrame& f) {
     Locker l(lock); 
     next->send(f);
-    sent += f.size();
+    if (!parent.isCatchUp())
+        sent += f.size();
 }
 
 void OutputInterceptor::activateOutput() {
-    Locker l(lock); 
-    moreOutput = true;
-    sendDoOutput();             
+    Locker l(lock);
+    if (parent.isCatchUp())
+        next->activateOutput();
+    else {
+        moreOutput = true;
+        sendDoOutput();
+    }
 }
 
 // Called in write thread when the IO layer has no more data to write.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/types.h Sat Sep 20 22:04:04 
2008
@@ -59,6 +59,8 @@
 
 struct ConnectionId : public std::pair<MemberId, Connection*>  {
     ConnectionId(const MemberId& m=MemberId(), Connection* c=0) :  
std::pair<MemberId, Connection*> (m,c) {}
+    ConnectionId(uint64_t m, uint64_t c)
+        : std::pair<MemberId, Connection*>(MemberId(m), 
reinterpret_cast<Connection*>(c)) {}
     MemberId getMember() const { return first; }
     Connection* getConnectionPtr() const { return second; }
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=697446&r1=697445&r2=697446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Sat Sep 20 
22:04:04 2008
@@ -81,21 +81,26 @@
     void add();
     void add0(bool force);
     void setup();
+
     void kill(size_t n) {
         if (n) forkedBrokers[n-1].kill();
         else broker0->broker->shutdown();
     }
+
+    void waitFor(size_t n) {
+        size_t retry=1000;            // TODO aconway 2008-07-16: nasty 
sleeps, clean this up.
+        while (retry && getGlobalCluster().size() != n) {
+            ::usleep(1000);
+            --retry;
+        }
+    }
 };
 
 ClusterFixture::ClusterFixture(size_t n, bool init0_) : 
name(Uuid(true).str()), init0(init0_) {
     add(n);
     if (!init0) return;  // FIXME aconway 2008-09-18: can't use local hack in 
this case.
     // Wait for all n members to join the cluster
-    int retry=20;            // TODO aconway 2008-07-16: nasty sleeps, clean 
this up.
-    while (retry && getGlobalCluster().size() != n) {
-        ::sleep(1);
-        --retry;
-    }
+    waitFor(n);
     BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
 }
 
@@ -139,7 +144,7 @@
 
     qpid::log::Logger::instance().setPrefix("main");
     broker0.reset(new BrokerFixture(parseOpts(argc, argv)));
-    push_back(broker0->getPort());
+    if (size()) front() = broker0->getPort(); else 
push_back(broker0->getPort());
 }
 
 // For debugging: op << for CPG types.
@@ -190,14 +195,12 @@
     BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 
(unsigned)0);
 }
 
-
 #endif
 
-
 QPID_AUTO_TEST_CASE(testCatchupSharedState) {
     ClusterFixture cluster(1);
-
     Client c0(cluster[0], "c0");
+
     // Create some shared state.
     c0.session.queueDeclare("q");
     c0.session.messageTransfer(arg::content=Message("foo","q"));
@@ -205,24 +208,33 @@
     while (c0.session.queueQuery("q").getMessageCount() != 2)
         ::usleep(1000);         // Wait for message to show up on broker 0.
 
-    // FIXME aconway 2008-09-18: close session until we catchup session state 
also.
-    c0.session.close();
-    c0.connection.close();
-
-    // Now join new broker, should catch up.
+    // Add a new broker, it should catch up.
     cluster.add();
 
-    // FIXME aconway 2008-09-18: when we do session state try adding
-    // further stuff from broker 0, and leaving a subscription active.
-
+    // Do some work post-add
+    c0.session.queueDeclare("p");
+    c0.session.messageTransfer(arg::content=Message("pfoo","p"));
+
+    // Do some work post-join
+    cluster.waitFor(2);
+    c0.session.messageTransfer(arg::content=Message("pbar","p"));
+    
     // Verify new broker has all state.
     Message m;
+
     Client c1(cluster[1], "c1");
+
     BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "foo");
     BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
     BOOST_CHECK_EQUAL(m.getData(), "bar");
-    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 
(unsigned)0);
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
+    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "pfoo");
+    BOOST_CHECK(c1.subs.get(m, "p", TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "pbar");
+    BOOST_CHECK_EQUAL(c1.session.queueQuery("p").getMessageCount(), 0u);
 }
 
 QPID_AUTO_TEST_CASE(testWiringReplication) {


Reply via email to