Author: aconway
Date: Thu Sep 11 06:26:10 2008
New Revision: 694243

URL: http://svn.apache.org/viewvc?rev=694243&view=rev
Log:

Moved PollableCondition, PollableQueue and to sys. Fixed cluster shutdown 
issues.

sys/PollableCondition: is a generic mechansim to poll for non-IO
events in the Poller.

sys/PollableQueue: is a thread-safe queue template that can be
dispatched from the Poller when there are items on the queue. It uses
PollableCondition.

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp
      - copied, changed from r693865, 
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h
      - copied, changed from r693865, 
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
      - copied, changed from r693865, 
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
Removed:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/cluster.mk
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
    incubator/qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Sep 11 06:26:10 2008
@@ -264,6 +264,9 @@
   qpid/sys/AggregateOutput.cpp \
   qpid/sys/AsynchIOHandler.cpp \
   qpid/sys/Dispatcher.cpp \
+  qpid/sys/PollableCondition.h \
+  qpid/sys/PollableCondition.cpp \
+  qpid/sys/PollableQueue.h \
   qpid/sys/Runnable.cpp \
   qpid/sys/SystemInfo.cpp \
   qpid/sys/Shlib.cpp \

Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Sep 11 06:26:10 2008
@@ -18,9 +18,6 @@
   qpid/cluster/Connection.h \
   qpid/cluster/Connection.cpp \
   qpid/cluster/NoOpConnectionOutputHandler.h \
-  qpid/cluster/PollableCondition.h \
-  qpid/cluster/PollableCondition.cpp \
-  qpid/cluster/PollableQueue.h \
   qpid/cluster/WriteEstimate.h \
   qpid/cluster/WriteEstimate.cpp \
   qpid/cluster/OutputInterceptor.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Sep 11 
06:26:10 2008
@@ -70,8 +70,6 @@
 }
 
 SessionState::~SessionState() {
-    // Remove ID from active session list.
-    broker.getSessionManager().forget(getId());
     if (mgmtObject != 0)
         mgmtObject->resourceDestroy ();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Thu Sep 11 
06:26:10 2008
@@ -649,7 +649,7 @@
 {
     check();
     if (state != ATTACHED) {
-        throw NotAttachedException("Session isn't attached");
+        throw NotAttachedException(QPID_MSG("Session " << getId() << " isn't 
attached"));
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Sep 11 
06:26:10 2008
@@ -61,7 +61,7 @@
 };
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) 
:
-    broker(&b),
+    broker(b),
     poller(b.getPoller()),
     cpg(*this),
     name(name_),
@@ -74,15 +74,17 @@
     ),
     deliverQueue(EventQueue::forEach(boost::bind(&Cluster::deliverEvent, this, 
_1)))
 {
-    broker->addFinalizer(boost::bind(&Cluster::leave, this));
-    QPID_LOG(notice, "Joining cluster: " << name.str() << " as " << self);
+    QPID_LOG(notice, "Cluster member " << self << " joining cluster " << 
name.str());
+    broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
     cpg.join(name);
 
     deliverQueue.start(poller);
     cpgDispatchHandle.startWatch(poller);
 }
 
-Cluster::~Cluster() {}
+Cluster::~Cluster() {
+    QPID_LOG(debug, "~Cluster()");
+}
 
 void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
     Mutex::ScopedLock l(lock);
@@ -94,20 +96,13 @@
     connections.erase(id);
 }
 
+// FIXME aconway 2008-09-10: leave is currently not called,
+// It should be called if we are shut down by a cluster admin command.
+// Any other type of exit is caught in disconnect().
+// 
 void Cluster::leave() {
-    Mutex::ScopedLock l(lock);
-    if (!broker) return;                               // Already left.
-    // Leave is called by from Broker destructor after the poller has
-    // been shut down. No dispatches can occur.
-
-    QPID_LOG(notice, "Leaving cluster " << name.str());
+    QPID_LOG(notice, "Cluster member " << self << " leaving cluster " << 
name.str());
     cpg.leave(name);
-    // broker= is set to 0 when the final config-change is delivered.
-    while(broker) {
-        Mutex::ScopedUnlock u(lock);
-        cpg.dispatchAll();
-    }
-    cpg.shutdown();
 }
 
 template <class T> void decodePtr(Buffer& buf, T*& ptr) {
@@ -177,6 +172,7 @@
 {
     try {
         MemberId from(nodeid, pid);
+        QPID_LOG(debug, "Cluster::deliver from " << from << " to " << self); 
// FIXME aconway 2008-09-10: 
         deliverQueue.push(Event::delivered(from, msg, msg_len));
     }
     catch (const std::exception& e) {
@@ -238,7 +234,7 @@
     cpg_address *left, int nLeft,
     cpg_address *joined, int nJoined)
 {
-    QPID_LOG(notice, "Cluster of " << nCurrent << ": " << AddrList(current, 
nCurrent) << ".\n Changes: "
+    QPID_LOG(info, "Cluster of " << nCurrent << ": " << AddrList(current, 
nCurrent) << ".\n Changes: "
              << AddrList(joined, nJoined) << AddrList(left, nLeft));
     
     if (nJoined)                // Notfiy new members of my URL.
@@ -246,13 +242,14 @@
             AMQFrame(in_place<ClusterJoiningBody>(ProtocolVersion(), 
url.str())),
             ConnectionId(self,0));
 
-
+    if (find(left, left+nLeft, self) != left+nLeft) {
+        // We have left the group, this is the final config change.
+        QPID_LOG(notice, "Cluster member " << self << " left cluster " << 
name.str());
+         broker.shutdown();
+    }
     Mutex::ScopedLock l(lock);
     for (int i = 0; i < nLeft; ++i) urls.erase(left[i]);
     // Add new members when their URL notice arraives.
-
-    if (find(left, left+nLeft, self) != left+nLeft)
-        broker = 0;       // We have left the group, this is the final config 
change.
     lock.notifyAll();     // Threads waiting for membership changes.
 }
 
@@ -261,22 +258,35 @@
     h.rewatch();
 }
 
-void Cluster::disconnect(sys::DispatchHandle& h) {
-    h.stopWatch();
-    QPID_LOG(critical, "Disconnected from cluster, shutting down");
-    broker->shutdown();
+void Cluster::disconnect(sys::DispatchHandle& ) {
+    // FIXME aconway 2008-09-11: this should be logged as critical,
+    // when we provide admin option to shut down cluster and let
+    // members leave cleanly.
+    QPID_LOG(notice, "Cluster member " << self << " disconnected from cluster 
" << name.str());
+    broker.shutdown();
 }
 
 void Cluster::joining(const MemberId& m, const string& url) {
-    QPID_LOG(notice, "Cluster member " << m << " has URL " << url);
+    QPID_LOG(info, "Cluster member " << m << " has URL " << url);
     urls.insert(UrlMap::value_type(m,Url(url)));
 }
 
 void Cluster::ready(const MemberId& ) {
     // FIXME aconway 2008-09-08: TODO
 }
-    
-}} // namespace qpid::cluster
 
+// Called from Broker::~Broker when broker is shut down.  At this
+// point we know the poller has stopped so no poller callbacks will be
+// invoked. We must ensure that CPG has also shut down so no CPG
+// callbacks will be invoked.
+// 
+void Cluster::shutdown() {
+    QPID_LOG(notice, "Cluster member " << self << " shutting down.");
+    try { cpg.shutdown(); }
+    catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " 
<< e.what()); }
+    delete this;
+}
 
+broker::Broker& Cluster::getBroker(){ return broker; }
 
+}} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Sep 11 
06:26:10 2008
@@ -21,7 +21,7 @@
 
 #include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/Event.h"
-#include "qpid/cluster/PollableQueue.h"
+#include "qpid/sys/PollableQueue.h"
 #include "qpid/cluster/NoOpConnectionOutputHandler.h"
 
 #include "qpid/broker/Broker.h"
@@ -43,7 +43,7 @@
  * Connection to the cluster.
  * Keeps cluster membership data.
  */
-class Cluster : public RefCounted, private Cpg::Handler
+class Cluster : private Cpg::Handler
 {
   public:
 
@@ -78,17 +78,16 @@
     void joining(const MemberId&, const std::string& url);
     void ready(const MemberId&);
 
-    broker::Broker& getBroker() { assert(broker); return *broker; }
-
     MemberId getSelf() const { return self; }
 
+    void shutdown();
+
+    broker::Broker& getBroker();
+    
   private:
     typedef std::map<MemberId, Url>  UrlMap;
     typedef std::map<ConnectionId, boost::intrusive_ptr<cluster::Connection> > 
ConnectionMap;
-
-    /** Message sent over the cluster. */
-    typedef std::pair<framing::AMQFrame, ConnectionId> Message;
-    typedef PollableQueue<Event> EventQueue;
+    typedef sys::PollableQueue<Event> EventQueue;
 
     boost::function<void()> shutdownNext;
 
@@ -127,7 +126,7 @@
     boost::intrusive_ptr<cluster::Connection> getConnection(const 
ConnectionId&);
 
     mutable sys::Monitor lock;  // Protect access to members.
-    broker::Broker* broker;
+    broker::Broker& broker;
     boost::shared_ptr<sys::Poller> poller;
     Cpg cpg;
     Cpg::Name name;
@@ -137,7 +136,7 @@
     ConnectionMap connections;
     NoOpConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
-    PollableQueue<Event> deliverQueue;
+    EventQueue deliverQueue;
     
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const UrlMap::value_type&);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Sep 11 
06:26:10 2008
@@ -66,10 +66,10 @@
 
     ClusterValues values;
     ClusterOptions options;
-    boost::intrusive_ptr<Cluster> cluster;
+    Cluster* cluster;
     boost::scoped_ptr<ConnectionCodec::Factory> factory;
 
-    ClusterPlugin() : options(values) {}
+    ClusterPlugin() : options(values), cluster(0) {}
 
     Options* getOptions() { return &options; }
 
@@ -78,20 +78,17 @@
         if (!broker || values.name.empty()) return;  // Only if --cluster-name 
option was specified.
         QPID_LOG_IF(warning, cluster, "Ignoring multiple initialization of 
cluster plugin.");
         cluster = new Cluster(values.name, values.getUrl(broker->getPort()), 
*broker);
-        broker->addFinalizer(boost::bind(&ClusterPlugin::shutdown, this));
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), 
*cluster)));
     }
 
     void earlyInitialize(Plugin::Target&) {}
-
-    void shutdown() { cluster = 0; }
 };
 
 static ClusterPlugin instance; // Static initialization.
 
 // For test purposes.
-boost::intrusive_ptr<Cluster> getGlobalCluster() { return instance.cluster; }
+Cluster& getGlobalCluster() { assert(instance.cluster); return 
*instance.cluster; }
     
 }} // namespace qpid::cluster

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Sep 11 
06:26:10 2008
@@ -106,5 +106,21 @@
         deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for 
delivery in separate thread.
 }
 
+
+void Connection::sessionState(const SequenceNumber& /*replayStart*/,
+                  const SequenceSet& /*sentIncomplete*/,
+                  const SequenceNumber& /*expected*/,
+                  const SequenceNumber& /*received*/,
+                  const SequenceSet& /*unknownCompleted*/,
+                  const SequenceSet& /*receivedIncomplete*/)
+{
+    // FIXME aconway 2008-09-10: TODO
+}
+    
+void Connection::shadowReady(uint64_t /*memberId*/, uint64_t /*connectionId*/)
+{
+    // FIXME aconway 2008-09-10: TODO
+}
+
 }} // namespace qpid::cluster
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Sep 11 
06:26:10 2008
@@ -40,9 +40,7 @@
 
 namespace cluster {
 
-/**
- * Plug-in associated with broker::Connections, both local and shadow.
- */
+/** Intercept broker::Connection calls for shadow and local cluster 
connections. */
 class Connection :
         public RefCounted,
         public sys::ConnectionInputHandler,
@@ -90,16 +88,13 @@
     sys::ConnectionInputHandler* create(sys::ConnectionOutputHandler* out, 
const std::string& id, bool isClient);
 
     // State dump methods.
-    virtual void sessionState(const framing::SequenceNumber& /*replayId*/,
-                              const framing::SequenceNumber& /*sendId*/,
-                              const framing::SequenceSet& /*sentIncomplete*/,
-                              const framing::SequenceNumber& /*expectedId*/,
-                              const framing::SequenceNumber& /*receivedId*/,
-                              const framing::SequenceSet& /*unknownCompleted*/,
-                              const framing::SequenceSet& 
/*receivedIncomplete*/) {}
+    virtual void sessionState(const SequenceNumber& replayStart,
+                              const SequenceSet& sentIncomplete,
+                              const SequenceNumber& expected,
+                              const SequenceNumber& received,
+                              const SequenceSet& unknownCompleted, const 
SequenceSet& receivedIncomplete);
     
-    virtual void shadowReady(uint64_t /*clusterId*/,
-                             const std::string& /*userId*/) {}
+    virtual void shadowReady(uint64_t memberId, uint64_t connectionId);
 
   private:
     void sendDoOutput();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Sep 
11 06:26:10 2008
@@ -30,16 +30,16 @@
 
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(framing::ProtocolVersion v, 
sys::OutputControl& out, const std::string& id) {
-    if (v == framing::ProtocolVersion(0, 10)) 
+    if (v == framing::ProtocolVersion(0, 10))
         return new ConnectionCodec(out, id, cluster);
     return 0;
 }
 
+// FIXME aconway 2008-08-27: outbound connections need to be made
+// with proper qpid::client code for failover, get rid of this
+// broker-side hack.
 sys::ConnectionCodec*
 ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& 
id) {
-    // FIXME aconway 2008-08-27: outbound connections need to be made
-    // with proper qpid::client code for failover, get rid of this
-    // broker-side hack.
     return next->create(out, id);
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.h Thu Sep 11 
06:26:10 2008
@@ -50,7 +50,8 @@
     struct Factory : public sys::ConnectionCodec::Factory {
         boost::shared_ptr<sys::ConnectionCodec::Factory> next;
         Cluster& cluster;
-        Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& 
c) : next(f), cluster(c) {}
+        Factory(boost::shared_ptr<sys::ConnectionCodec::Factory> f, Cluster& c)
+            : next(f), cluster(c) {}
         sys::ConnectionCodec* create(framing::ProtocolVersion, 
sys::OutputControl&, const std::string& id);
         sys::ConnectionCodec* create(sys::OutputControl&, const std::string& 
id);
     };

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp (from 
r693865, incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp&r1=693865&r2=694243&rev=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.cpp Thu Sep 11 
06:26:10 2008
@@ -27,14 +27,14 @@
 // 
 
 #include "qpid/sys/posix/PrivatePosix.h"
-#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/PollableCondition.h"
 #include "qpid/Exception.h"
 
 #include <unistd.h>
 #include <fcntl.h>
 
 namespace qpid {
-namespace cluster {
+namespace sys {
 
 PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
     int fds[2];
@@ -67,13 +67,13 @@
 
 #if 0
 // FIXME aconway 2008-08-12: More efficient Linux implementation using
-// eventfd system call.  Do a configure.ac test to enable this when
-// eventfd is available.
+// eventfd system call.  Move to separate file & do configure.ac test
+// to enable this when ::eventfd() is available.
 
 #include <sys/eventfd.h>
 
 namespace qpid {
-namespace cluster {
+namespace sys {
 
 PollableCondition::PollableCondition() : IOHandle(new sys::IOHandlePrivate) {
     impl->fd = ::eventfd(0, 0);
@@ -95,6 +95,6 @@
     
 #endif
 
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
 
 #endif  /*!QPID_SYS_LINUX_POLLABLECONDITION_CPP*/

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h (from 
r693865, incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h&r1=693865&r2=694243&rev=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableCondition.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableCondition.h Thu Sep 11 
06:26:10 2008
@@ -29,7 +29,7 @@
 // 
 
 namespace qpid {
-namespace cluster {
+namespace sys {
 
 /**
  * A pollable condition to integrate in-process conditions with IO
@@ -55,6 +55,6 @@
   private:
     int writeFd;
 };
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
 
 #endif  /*!QPID_SYS_POLLABLECONDITION_H*/

Copied: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (from 
r693865, incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h)
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?p2=incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h&p1=incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h&r1=693865&r2=694243&rev=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Thu Sep 11 
06:26:10 2008
@@ -1,5 +1,5 @@
-#ifndef QPID_CLUSTER_POLLABLEQUEUE_H
-#define QPID_CLUSTER_POLLABLEQUEUE_H
+#ifndef QPID_SYS_POLLABLEQUEUE_H
+#define QPID_SYS_POLLABLEQUEUE_H
 
 /*
  *
@@ -22,7 +22,7 @@
  *
  */
 
-#include "qpid/cluster/PollableCondition.h"
+#include "qpid/sys/PollableCondition.h"
 #include "qpid/sys/Dispatcher.h"
 #include "qpid/sys/Mutex.h"
 #include <boost/function.hpp>
@@ -34,7 +34,7 @@
 
 namespace sys { class Poller; }
 
-namespace cluster {
+namespace sys {
 
 // FIXME aconway 2008-08-11: this could be of more general interest,
 // move to common lib.
@@ -108,6 +108,6 @@
     batch.clear();
 }
 
-}} // namespace qpid::cluster
+}} // namespace qpid::sys
 
-#endif  /*!QPID_CLUSTER_POLLABLEQUEUE_H*/
+#endif  /*!QPID_SYS_POLLABLEQUEUE_H*/

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerFixture.h Thu Sep 11 06:26:10 
2008
@@ -92,7 +92,8 @@
     SessionType session;
     qpid::client::SubscriptionManager subs;
     qpid::client::LocalQueue lq;
-    ClientT(uint16_t port) : connection(port), 
session(connection.newSession()), subs(session) {}
+    ClientT(uint16_t port, const std::string& name=std::string())
+        : connection(port), session(connection.newSession(name)), 
subs(session) {}
 
     ~ClientT() { connection.close(); }
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Thu Sep 11 
06:26:10 2008
@@ -21,13 +21,14 @@
 #include "ForkedBroker.h"
 #include "BrokerFixture.h"
 
-#include "qpid/cluster/Cpg.h"
+#include "qpid/client/Connection.h"
+#include "qpid/client/Session.h"
 #include "qpid/cluster/Cluster.h"
+#include "qpid/cluster/Cpg.h"
 #include "qpid/cluster/DumpClient.h"
 #include "qpid/framing/AMQBody.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/Session.h"
 #include "qpid/framing/Uuid.h"
+#include "qpid/framing/reply_exceptions.h"
 #include "qpid/log/Logger.h"
 
 #include <boost/bind.hpp>
@@ -41,7 +42,7 @@
 
 namespace qpid {
 namespace cluster {
-boost::intrusive_ptr<Cluster> getGlobalCluster(); // Defined in 
qpid/cluster/ClusterPlugin.cpp
+Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
 }} // namespace qpid::cluster
 
 
@@ -81,11 +82,11 @@
     add(n);
     // Wait for all n members to join the cluster
     int retry=20;            // TODO aconway 2008-07-16: nasty sleeps, clean 
this up.
-    while (retry && getGlobalCluster()->size() != n) {
+    while (retry && getGlobalCluster().size() != n) {
         ::sleep(1);
         --retry;
     }
-    BOOST_REQUIRE_EQUAL(n, getGlobalCluster()->size());
+    BOOST_REQUIRE_EQUAL(n, getGlobalCluster().size());
 }
 
 void ClusterFixture::add() {
@@ -135,7 +136,37 @@
     return o;
 }
 
-QPID_AUTO_TEST_CASE(testDumpClient) {
+#if 0                           // FIXME aconway 2008-09-10: finish & enable
+QPID_AUTO_TEST_CASE(testDumpConsumers) {
+    ClusterFixture cluster(1);
+    Client a(cluster[0]);
+    a.session.queueDeclare("q");
+    a.subs.subscribe(a.lq, "q");
+
+    cluster.add();
+    Client b(cluster[1]);
+    try {
+        b.connection.newSession(a.session.getId().getName());
+        BOOST_FAIL("Expected SessionBusyException for " << 
a.session.getId().getName());
+    } catch (const SessionBusyException&) {}
+
+    // Transfer some messages to the subscription by client a.
+    Message m;
+    a.session.messageTransfer(arg::bindingKey="q", arg::content=Message("aaa", 
"q"));
+    BOOST_CHECK(a.lq.get(m, TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "aaa");
+
+    b.session.messageTransfer(arg::bindingKey="q", arg::content=Message("bbb", 
"q"));
+    BOOST_CHECK(a.lq.get(m, TIME_SEC));
+    BOOST_CHECK_EQUAL(m.getData(), "bbb");
+
+    // Verify that the queue has been drained on both brokers.
+    // This proves that the consumer was replicated when the second broker 
joined.
+    BOOST_CHECK_EQUAL(a.session.queueQuery("q").getMessageCount(), 0);
+}
+#endif
+
+QPID_AUTO_TEST_CASE(testDumpClientSharedState) {
     BrokerFixture donor, receiver;
     {
         Client c(donor.getPort());
@@ -146,13 +177,13 @@
 
         c.session.exchangeDeclare(arg::exchange="exd", arg::type="direct", 
arg::arguments=args);
         c.session.exchangeBind(arg::exchange="exd", arg::queue="qa", 
arg::bindingKey="foo");
-        c.session.messageTransfer(arg::destination="exd", 
arg::content=TransferContent("one", "foo"));
+        c.session.messageTransfer(arg::destination="exd", 
arg::content=Message("one", "foo"));
 
         c.session.exchangeDeclare("ext", arg::type="topic");
         c.session.exchangeBind(arg::exchange="ext", arg::queue="qb", 
arg::bindingKey="bar");
         c.subs.subscribe(c.lq, "qa", FlowControl::messageCredit(0));
-        c.session.messageTransfer(arg::destination="ext", 
arg::content=TransferContent("one", "bar"));
-        c.session.messageTransfer(arg::destination="ext", 
arg::content=TransferContent("two", "bar"));
+        c.session.messageTransfer(arg::destination="ext", 
arg::content=Message("one", "bar"));
+        c.session.messageTransfer(arg::destination="ext", 
arg::content=Message("two", "bar"));
 
         c.session.close();
         c.connection.close();
@@ -202,11 +233,11 @@
         BOOST_CHECK_EQUAL(m.getDeliveryProperties().getRoutingKey(), "bar");
 
         // Verify bindings
-        r.session.messageTransfer(arg::destination="exd", 
arg::content=TransferContent("xxx", "foo"));
+        r.session.messageTransfer(arg::destination="exd", 
arg::content=Message("xxx", "foo"));
         BOOST_CHECK(r.subs.get(m, "qa"));
         BOOST_CHECK_EQUAL(m.getData(), "xxx");
         
-        r.session.messageTransfer(arg::destination="ext", 
arg::content=TransferContent("yyy", "bar"));
+        r.session.messageTransfer(arg::destination="ext", 
arg::content=Message("yyy", "bar"));
         BOOST_CHECK(r.subs.get(m, "qb"));
         BOOST_CHECK_EQUAL(m.getData(), "yyy");
 
@@ -254,8 +285,8 @@
     ClusterFixture cluster(2);
     Client c0(cluster[0]);
     c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
-    c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+    c0.session.messageTransfer(arg::content=Message("foo", "q"));
+    c0.session.messageTransfer(arg::content=Message("bar", "q"));
     c0.session.close();
     Client c1(cluster[1]);
     Message msg;
@@ -268,19 +299,19 @@
 QPID_AUTO_TEST_CASE(testMessageDequeue) {
     // Enqueue on one broker, dequeue on two others.
     ClusterFixture cluster (3);
-    Client c0(cluster[0]);
+    Client c0(cluster[0], "c0");
     c0.session.queueDeclare("q");
-    c0.session.messageTransfer(arg::content=TransferContent("foo", "q"));
-    c0.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+    c0.session.messageTransfer(arg::content=Message("foo", "q"));
+    c0.session.messageTransfer(arg::content=Message("bar", "q"));
 
     Message msg;
 
     // Dequeue on 2 others, ensure correct order.
-    Client c1(cluster[1]);
+    Client c1(cluster[1], "c1");
     BOOST_CHECK(c1.subs.get(msg, "q"));
     BOOST_CHECK_EQUAL("foo", msg.getData());
     
-    Client c2(cluster[2]);
+    Client c2(cluster[2], "c2");
     BOOST_CHECK(c1.subs.get(msg, "q"));
     BOOST_CHECK_EQUAL("bar", msg.getData());
 
@@ -298,8 +329,8 @@
     c0.subs.subscribe(c0.lq, "q", FlowControl::messageCredit(2));
     // Now send messages
     Client c1(cluster[1]);
-    c1.session.messageTransfer(arg::content=TransferContent("foo", "q"));
-    c1.session.messageTransfer(arg::content=TransferContent("bar", "q"));
+    c1.session.messageTransfer(arg::content=Message("foo", "q"));
+    c1.session.messageTransfer(arg::content=Message("bar", "q"));
 
     // Check they arrived
     Message m;

Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=694243&r1=694242&r2=694243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Sep 11 06:26:10 2008
@@ -69,19 +69,20 @@
     -->
     <control name="session-state" code="0x4" label="Set session state during a 
brain dump.">
       <!-- Target session deduced from channel number.  -->
-      <field name="replay-id" type="sequence-no"/>
-      <field name="send-id" type="sequence-no"/>
-      <field name="sent-incomplete" type="sequence-set"/>
 
-      <field name="expected-id" type="sequence-no"/>
-      <field name="received-id" type="sequence-no"/>
-      <field name="unknown-completed" type="sequence-set"/>
-      <field name="received-incomplete" type="sequence-set"/>
+      <field name="replay-start" type="sequence-no"/>         <!-- Replay 
frames will start from this point.-->
+      <field name="sent-incomplete" type="sequence-set"/>      <!-- Commands 
sent and incomplete. -->
+
+      <field name="expected" type="sequence-no"/>             <!-- Idempotence 
barrier -->
+      <field name="received" type="sequence-no"/>             <!-- Received up 
to here > expected-->
+      <field name="unknown-completed" type="sequence-set"/>    <!-- Completed 
but not known to peer. -->
+      <field name="received-incomplete" type="sequence-set"/>  <!-- Received 
and incomplete -->
     </control>
 
     <control name="shadow-ready" code="0x5" label="End of shadow connection 
dump.">
-      <field name="cluster-id" type="uint64"/>
-      <field name="user-id" type="vbin16"/>
+      <field name="member-id" type="uint64"/>
+      <field name="connection-id" type="uint64"/>
     </control>
+
   </class>
 </amqp>


Reply via email to