Author: aconway Date: Fri Jan 16 13:34:46 2009 New Revision: 735151 URL: http://svn.apache.org/viewvc?rev=735151&view=rev Log: cluster refactor: separate out dispatch strategy, implement poller and thread dispatch.
Modified: qpid/trunk/qpid/cpp/src/cluster.mk qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Modified: qpid/trunk/qpid/cpp/src/cluster.mk URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=735151&r1=735150&r2=735151&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/cluster.mk (original) +++ qpid/trunk/qpid/cpp/src/cluster.mk Fri Jan 16 13:34:46 2009 @@ -38,36 +38,40 @@ cluster_la_SOURCES = \ $(CMAN_SOURCES) \ - qpid/cluster/types.h \ qpid/cluster/Cluster.cpp \ qpid/cluster/Cluster.h \ - qpid/cluster/Cpg.cpp \ - qpid/cluster/Cpg.h \ - qpid/cluster/Dispatchable.h \ + qpid/cluster/ClusterLeaveException.h \ + qpid/cluster/ClusterMap.cpp \ + qpid/cluster/ClusterMap.h \ qpid/cluster/ClusterPlugin.cpp \ - qpid/cluster/ConnectionCodec.h \ - qpid/cluster/ConnectionCodec.cpp \ - qpid/cluster/Connection.h \ qpid/cluster/Connection.cpp \ + qpid/cluster/Connection.h \ + qpid/cluster/ConnectionCodec.cpp \ + qpid/cluster/ConnectionCodec.h \ qpid/cluster/ConnectionMap.h \ - qpid/cluster/NoOpConnectionOutputHandler.h \ - qpid/cluster/WriteEstimate.h \ - qpid/cluster/WriteEstimate.cpp \ - qpid/cluster/OutputInterceptor.h \ - qpid/cluster/OutputInterceptor.cpp \ - qpid/cluster/ProxyInputHandler.h \ - qpid/cluster/Event.h \ - qpid/cluster/Event.cpp \ - qpid/cluster/DumpClient.h \ + qpid/cluster/Cpg.cpp \ + qpid/cluster/Cpg.h \ + qpid/cluster/Dispatchable.h \ qpid/cluster/DumpClient.cpp \ - qpid/cluster/ClusterMap.h \ - qpid/cluster/ClusterMap.cpp \ - qpid/cluster/FailoverExchange.h \ + qpid/cluster/DumpClient.h \ + qpid/cluster/Event.cpp \ + qpid/cluster/Event.h \ qpid/cluster/FailoverExchange.cpp \ - qpid/cluster/Multicaster.h \ + qpid/cluster/FailoverExchange.h \ qpid/cluster/Multicaster.cpp \ - qpid/cluster/ClusterLeaveException.h \ - qpid/cluster/Quorum.h + qpid/cluster/Multicaster.h \ + qpid/cluster/NoOpConnectionOutputHandler.h \ + qpid/cluster/OutputInterceptor.cpp \ + qpid/cluster/OutputInterceptor.h \ + qpid/cluster/PollerDispatch.cpp \ + qpid/cluster/PollerDispatch.h \ + qpid/cluster/ThreadDispatch.cpp \ + qpid/cluster/ThreadDispatch.h \ + qpid/cluster/ProxyInputHandler.h \ + qpid/cluster/Quorum.h \ + qpid/cluster/WriteEstimate.cpp \ + qpid/cluster/WriteEstimate.h \ + qpid/cluster/types.h cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la cluster_la_LDFLAGS = $(PLUGINLDFLAGS) Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=735151&r1=735150&r2=735151&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jan 16 13:34:46 2009 @@ -85,6 +85,7 @@ Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) : broker(b), + mgmtObject(0), poller(b.getPoller()), cpg(*this), name(name_), @@ -92,14 +93,8 @@ myId(cpg.self()), readMax(readMax_), writeEstimate(writeEstimate_), - cpgDispatchHandle( - cpg, - boost::bind(&Cluster::dispatch, this, _1), // read - 0, // write - boost::bind(&Cluster::disconnect, this, _1) // disconnect - ), mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), - mgmtObject(0), + dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller), state(INIT), lastSize(0), @@ -114,7 +109,7 @@ } broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this); failoverExchange.reset(new FailoverExchange(this)); - cpgDispatchHandle.startWatch(poller); + dispatcher.start(); deliverQueue.start(); QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl); if (quorum_) quorum.init(); @@ -153,14 +148,13 @@ state = LEFT; QPID_LOG(notice, *this << " leaving cluster " << name); if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN"); - if (!deliverQueue.isStopped()) deliverQueue.stop(); try { cpg.leave(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error leaving process group: " << e.what()); } try { broker.shutdown(); } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error during shutdown: " << e.what()); + QPID_LOG(critical, *this << " error during broker shutdown: " << e.what()); } } } @@ -202,7 +196,8 @@ // Entry point: called when deliverQueue has events to process. void Cluster::delivered(PollableEventQueue::Queue& events) { try { - for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1)); + for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i) + deliveredEvent(*i, i->getData()); events.clear(); } catch (const std::exception& e) { QPID_LOG(critical, *this << " error in cluster delivery: " << e.what()); @@ -210,8 +205,8 @@ } } -void Cluster::deliveredEvent(const Event& e) { - Buffer buf(e); +void Cluster::deliveredEvent(const EventHeader& e, const char* data) { + Buffer buf(const_cast<char*>(data), e.getSize()); AMQFrame frame; if (e.isCluster()) { while (frame.decode(buf)) { @@ -270,27 +265,6 @@ return o << a.suffix; } -// Entry point: called by IO to dispatch CPG events. -void Cluster::dispatch(sys::DispatchHandle& h) { - try { - cpg.dispatchAll(); - h.rewatch(); - } catch (const std::exception& e) { - QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what()); - leave(); - } -} - -// Entry point: called if disconnected from CPG. -void Cluster::disconnect(sys::DispatchHandle& ) { - QPID_LOG(critical, *this << " error disconnected from cluster"); - try { - broker.shutdown(); - } catch (const std::exception& e) { - QPID_LOG(error, *this << " error in shutdown: " << e.what()); - } -} - void Cluster::configChange ( cpg_handle_t /*handle*/, cpg_name */*group*/, @@ -358,7 +332,7 @@ if (state != LEFT) { try { cpg.shutdown(); } catch (const std::exception& e) { - QPID_LOG(error, *this << " during shutdown: " << e.what()); + QPID_LOG(error, *this << " shutting down CPG: " << e.what()); } } delete this; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=735151&r1=735150&r2=735151&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jan 16 13:34:46 2009 @@ -19,19 +19,19 @@ * */ -#include "Cpg.h" -#include "Event.h" -#include "NoOpConnectionOutputHandler.h" #include "ClusterMap.h" #include "ConnectionMap.h" +#include "Cpg.h" +#include "Event.h" #include "FailoverExchange.h" -#include "Quorum.h" #include "Multicaster.h" +#include "NoOpConnectionOutputHandler.h" +#include "PollerDispatch.h" +#include "Quorum.h" #include "qpid/broker/Broker.h" #include "qpid/sys/PollableQueue.h" #include "qpid/sys/Monitor.h" -#include "qpid/sys/LockPtr.h" #include "qpid/management/Manageable.h" #include "qpid/Url.h" #include "qmf/org/apache/qpid/cluster/Cluster.h" @@ -99,8 +99,6 @@ size_t getWriteEstimate() { return writeEstimate; } private: - typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr; - typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr; typedef sys::Monitor::ScopedLock Lock; typedef sys::PollableQueue<Event> PollableEventQueue; @@ -129,15 +127,11 @@ void configChange(const MemberId&, const std::string& addresses, Lock& l); void shutdown(const MemberId&, Lock&); void delivered(PollableEventQueue::Queue&); // deliverQueue callback - void deliveredEvent(const Event&); + void deliveredEvent(const EventHeader&, const char*); // Helper, called in deliver thread. void dumpStart(const MemberId& dumpee, const Url& url, Lock&); - // CPG callbacks, called in CPG IO thread. - void dispatch(sys::DispatchHandle&); // Dispatch CPG events. - void disconnect(sys::DispatchHandle&); // PG was disconnected - void deliver( // CPG deliver callback. cpg_handle_t /*handle*/, struct cpg_name *group, @@ -177,6 +171,7 @@ // Immutable members set on construction, never changed. broker::Broker& broker; + qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle boost::shared_ptr<sys::Poller> poller; Cpg cpg; const std::string name; @@ -186,12 +181,10 @@ const size_t writeEstimate; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; - sys::DispatchHandle cpgDispatchHandle; - // Thread safe members Multicaster mcast; - qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle + PollerDispatch dispatcher; PollableEventQueue deliverQueue; ConnectionMap connections; boost::shared_ptr<FailoverExchange> failoverExchange; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=735151&r1=735150&r2=735151&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Jan 16 13:34:46 2009 @@ -93,7 +93,7 @@ static const char* EVENT_TYPE_NAMES[] = { "data", "control" }; -std::ostream& operator << (std::ostream& o, const Event& e) { +std::ostream& operator << (std::ostream& o, const EventHeader& e) { o << "[event " << e.getConnectionId() << " " << EVENT_TYPE_NAMES[e.getType()] << " " << e.getSize() << " bytes]"; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=735151&r1=735150&r2=735151&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Jan 16 13:34:46 2009 @@ -91,7 +91,7 @@ RefCountedBuffer::pointer store; }; -std::ostream& operator << (std::ostream&, const Event&); +std::ostream& operator << (std::ostream&, const EventHeader&); }} // namespace qpid::cluster #endif /*!QPID_CLUSTER_EVENT_H*/