Author: aconway
Date: Fri Jan 16 13:34:46 2009
New Revision: 735151

URL: http://svn.apache.org/viewvc?rev=735151&view=rev
Log:
cluster refactor: separate out dispatch strategy, implement poller and thread 
dispatch.

Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Fri Jan 16 13:34:46 2009
@@ -38,36 +38,40 @@
 
 cluster_la_SOURCES = \
   $(CMAN_SOURCES) \
-  qpid/cluster/types.h \
   qpid/cluster/Cluster.cpp \
   qpid/cluster/Cluster.h \
-  qpid/cluster/Cpg.cpp \
-  qpid/cluster/Cpg.h \
-  qpid/cluster/Dispatchable.h \
+  qpid/cluster/ClusterLeaveException.h         \
+  qpid/cluster/ClusterMap.cpp                  \
+  qpid/cluster/ClusterMap.h                    \
   qpid/cluster/ClusterPlugin.cpp \
-  qpid/cluster/ConnectionCodec.h \
-  qpid/cluster/ConnectionCodec.cpp \
-  qpid/cluster/Connection.h \
   qpid/cluster/Connection.cpp \
+  qpid/cluster/Connection.h                    \
+  qpid/cluster/ConnectionCodec.cpp             \
+  qpid/cluster/ConnectionCodec.h               \
   qpid/cluster/ConnectionMap.h \
-  qpid/cluster/NoOpConnectionOutputHandler.h \
-  qpid/cluster/WriteEstimate.h \
-  qpid/cluster/WriteEstimate.cpp \
-  qpid/cluster/OutputInterceptor.h \
-  qpid/cluster/OutputInterceptor.cpp \
-  qpid/cluster/ProxyInputHandler.h \
-  qpid/cluster/Event.h \
-  qpid/cluster/Event.cpp \
-  qpid/cluster/DumpClient.h \
+  qpid/cluster/Cpg.cpp                         \
+  qpid/cluster/Cpg.h                           \
+  qpid/cluster/Dispatchable.h                  \
   qpid/cluster/DumpClient.cpp \
-  qpid/cluster/ClusterMap.h \
-  qpid/cluster/ClusterMap.cpp \
-  qpid/cluster/FailoverExchange.h \
+  qpid/cluster/DumpClient.h                    \
+  qpid/cluster/Event.cpp                       \
+  qpid/cluster/Event.h                         \
   qpid/cluster/FailoverExchange.cpp \
-  qpid/cluster/Multicaster.h \
+  qpid/cluster/FailoverExchange.h              \
   qpid/cluster/Multicaster.cpp \
-  qpid/cluster/ClusterLeaveException.h \
-  qpid/cluster/Quorum.h
+  qpid/cluster/Multicaster.h                   \
+  qpid/cluster/NoOpConnectionOutputHandler.h   \
+  qpid/cluster/OutputInterceptor.cpp           \
+  qpid/cluster/OutputInterceptor.h             \
+  qpid/cluster/PollerDispatch.cpp              \
+  qpid/cluster/PollerDispatch.h                        \
+  qpid/cluster/ThreadDispatch.cpp              \
+  qpid/cluster/ThreadDispatch.h                        \
+  qpid/cluster/ProxyInputHandler.h             \
+  qpid/cluster/Quorum.h                                \
+  qpid/cluster/WriteEstimate.cpp               \
+  qpid/cluster/WriteEstimate.h                 \
+  qpid/cluster/types.h 
 
 cluster_la_LIBADD=  -lcpg $(libcman) libqpidbroker.la libqpidclient.la
 cluster_la_LDFLAGS = $(PLUGINLDFLAGS)

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jan 16 13:34:46 2009
@@ -85,6 +85,7 @@
 
 Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, 
bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
     broker(b),
+    mgmtObject(0),
     poller(b.getPoller()),
     cpg(*this),
     name(name_),
@@ -92,14 +93,8 @@
     myId(cpg.self()),
     readMax(readMax_),
     writeEstimate(writeEstimate_),
-    cpgDispatchHandle(
-        cpg,
-        boost::bind(&Cluster::dispatch, this, _1), // read
-        0,                                         // write
-        boost::bind(&Cluster::disconnect, this, _1) // disconnect
-    ),
     mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
-    mgmtObject(0),
+    dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
     deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
     state(INIT),
     lastSize(0),
@@ -114,7 +109,7 @@
     }
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     failoverExchange.reset(new FailoverExchange(this));
-    cpgDispatchHandle.startWatch(poller);
+    dispatcher.start();
     deliverQueue.start();
     QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << 
myUrl);
     if (quorum_) quorum.init();
@@ -153,14 +148,13 @@
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
         if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
-        if (!deliverQueue.isStopped()) deliverQueue.stop();
         try { cpg.leave(); }
         catch (const std::exception& e) {
             QPID_LOG(critical, *this << " error leaving process group: " << 
e.what());
         }
         try { broker.shutdown(); }
         catch (const std::exception& e) {
-            QPID_LOG(critical, *this << " error during shutdown: " << 
e.what());
+            QPID_LOG(critical, *this << " error during broker shutdown: " << 
e.what());
         }
     }
 }
@@ -202,7 +196,8 @@
 // Entry point: called when deliverQueue has events to process.
 void Cluster::delivered(PollableEventQueue::Queue& events) {
     try {
-        for_each(events.begin(), events.end(), 
boost::bind(&Cluster::deliveredEvent, this, _1));
+        for (PollableEventQueue::Queue::iterator i = events.begin(); i != 
events.end(); ++i)
+            deliveredEvent(*i, i->getData());
         events.clear();
     } catch (const std::exception& e) {
         QPID_LOG(critical, *this << " error in cluster delivery: " << 
e.what());
@@ -210,8 +205,8 @@
     }
 }
 
-void Cluster::deliveredEvent(const Event& e) {
-    Buffer buf(e);
+void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
+    Buffer buf(const_cast<char*>(data), e.getSize());
     AMQFrame frame;
     if (e.isCluster())  {
         while (frame.decode(buf)) {
@@ -270,27 +265,6 @@
     return o << a.suffix;
 }
 
-// Entry point: called by IO to dispatch CPG events.
-void Cluster::dispatch(sys::DispatchHandle& h) {
-    try {
-        cpg.dispatchAll();
-        h.rewatch();
-    } catch (const std::exception& e) {
-        QPID_LOG(critical, *this << " error in cluster dispatch: " << 
e.what());
-        leave();
-    }
-}
-
-// Entry point: called if disconnected from  CPG.
-void Cluster::disconnect(sys::DispatchHandle& ) {
-    QPID_LOG(critical, *this << " error disconnected from cluster");
-    try {
-        broker.shutdown();
-    } catch (const std::exception& e) {
-        QPID_LOG(error, *this << " error in shutdown: " << e.what());
-    }
-}
-
 void Cluster::configChange ( 
     cpg_handle_t /*handle*/,
     cpg_name */*group*/,
@@ -358,7 +332,7 @@
     if (state != LEFT) {
         try { cpg.shutdown(); }
         catch (const std::exception& e) {
-            QPID_LOG(error, *this << " during shutdown: " << e.what());
+            QPID_LOG(error, *this << " shutting down CPG: " << e.what());
         }
     }
     delete this;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jan 16 13:34:46 2009
@@ -19,19 +19,19 @@
  *
  */
 
-#include "Cpg.h"
-#include "Event.h"
-#include "NoOpConnectionOutputHandler.h"
 #include "ClusterMap.h"
 #include "ConnectionMap.h"
+#include "Cpg.h"
+#include "Event.h"
 #include "FailoverExchange.h"
-#include "Quorum.h"
 #include "Multicaster.h"
+#include "NoOpConnectionOutputHandler.h"
+#include "PollerDispatch.h"
+#include "Quorum.h"
 
 #include "qpid/broker/Broker.h"
 #include "qpid/sys/PollableQueue.h"
 #include "qpid/sys/Monitor.h"
-#include "qpid/sys/LockPtr.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/Url.h"
 #include "qmf/org/apache/qpid/cluster/Cluster.h"
@@ -99,8 +99,6 @@
     size_t getWriteEstimate() { return writeEstimate; }
     
   private:
-    typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
-    typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
     typedef sys::Monitor::ScopedLock Lock;
 
     typedef sys::PollableQueue<Event> PollableEventQueue;
@@ -129,15 +127,11 @@
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void shutdown(const MemberId&, Lock&);
     void delivered(PollableEventQueue::Queue&); // deliverQueue callback
-    void deliveredEvent(const Event&); 
+    void deliveredEvent(const EventHeader&, const char*); 
 
     // Helper, called in deliver thread.
     void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
 
-    // CPG callbacks, called in CPG IO thread.
-    void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
-    void disconnect(sys::DispatchHandle&); // PG was disconnected
-
     void deliver( // CPG deliver callback. 
         cpg_handle_t /*handle*/,
         struct cpg_name *group,
@@ -177,6 +171,7 @@
 
     // Immutable members set on construction, never changed.
     broker::Broker& broker;
+    qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns 
lifecycle
     boost::shared_ptr<sys::Poller> poller;
     Cpg cpg;
     const std::string name;
@@ -186,12 +181,10 @@
     const size_t writeEstimate;
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
-    sys::DispatchHandle cpgDispatchHandle;
-
 
     // Thread safe members
     Multicaster mcast;
-    qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns 
lifecycle
+    PollerDispatch dispatcher;
     PollableEventQueue deliverQueue;
     ConnectionMap connections;
     boost::shared_ptr<FailoverExchange> failoverExchange;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Jan 16 13:34:46 2009
@@ -93,7 +93,7 @@
 
 static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
 
-std::ostream& operator << (std::ostream& o, const Event& e) {
+std::ostream& operator << (std::ostream& o, const EventHeader& e) {
     o << "[event " << e.getConnectionId() 
       << " " << EVENT_TYPE_NAMES[e.getType()]
       << " " << e.getSize() << " bytes]";

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Jan 16 13:34:46 2009
@@ -91,7 +91,7 @@
     RefCountedBuffer::pointer store;
 };
 
-std::ostream& operator << (std::ostream&, const Event&);
+std::ostream& operator << (std::ostream&, const EventHeader&);
 }} // namespace qpid::cluster
 
 #endif  /*!QPID_CLUSTER_EVENT_H*/


Reply via email to