Author: aconway Date: Wed Jan 28 20:48:23 2009 New Revision: 738618 URL: http://svn.apache.org/viewvc?rev=738618&view=rev Log: (empty)
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=738618&r1=738617&r2=738618&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 28 20:48:23 2009 @@ -86,7 +86,7 @@ bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); } }; -Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) : +Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) : broker(b), mgmtObject(0), poller(b.getPoller()), @@ -96,7 +96,7 @@ myId(cpg.self()), readMax(readMax_), writeEstimate(writeEstimate_), - mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)), + mcast(cpg, poller, boost::bind(&Cluster::leave, this)), dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)), deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller), deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller), Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=738618&r1=738617&r2=738618&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 28 20:48:23 2009 @@ -71,7 +71,7 @@ * Join a cluster. */ Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum, - size_t readMax, size_t writeEstimate, size_t mcastMax); + size_t readMax, size_t writeEstimate); virtual ~Cluster(); Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=738618&r1=738617&r2=738618&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Jan 28 20:48:23 2009 @@ -52,9 +52,9 @@ string name; string url; bool quorum; - size_t readMax, writeEstimate, mcastMax; + size_t readMax, writeEstimate; - ClusterValues() : quorum(false), readMax(10), writeEstimate(64), mcastMax(0) {} + ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {} Url getUrl(uint16_t port) const { if (url.empty()) return Url::getIpAddressesUrl(port); @@ -79,11 +79,9 @@ ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.") #endif ("cluster-read-max", optValue(values.readMax,"N"), - "Experimental: Max unreplicated reads per connetion connection. 0=no limit.") - ("cluster-mcast-max", optValue(values.mcastMax,"N"), - "Experimental: Max outstanding multicasts per broker. 0=no limit.") + "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.") ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"), - "Experimental: initial estimate for connection writes rate per multicast cycle"); + "Experimental: initial estimate for connection write rate per multicast cycle"); } }; @@ -147,7 +145,7 @@ values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, - values.readMax, values.writeEstimate*1024, values.mcastMax + values.readMax, values.writeEstimate*1024 ); broker->setConnectionFactory( boost::shared_ptr<sys::ConnectionCodec::Factory>( Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=738618&r1=738617&r2=738618&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Wed Jan 28 20:48:23 2009 @@ -28,14 +28,12 @@ namespace qpid { namespace cluster { -Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_, +Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) : onError(onError_), cpg(cpg_), queue(boost::bind(&Multicaster::sendMcast, this, _1), poller), - holding(true), - mcastMax(mcastMax_), - pending(0) + holding(true) { queue.start(); } @@ -69,22 +67,10 @@ try { PollableEventQueue::Queue::iterator i = values.begin(); while( i != values.end()) { - if (mcastMax) { - sys::Mutex::ScopedLock l(lock); - if (pending == mcastMax) { - queue.stop(); - break ; - } - ++pending; - } QPID_LATENCY_RECORD("mcast send queue", *i); iovec iov = i->toIovec(); if (!cpg.mcast(&iov, 1)) { // cpg didn't send because of CPG flow control. - if (mcastMax) { - sys::Mutex::ScopedLock l(lock); - --pending; - } break; } ++i; @@ -108,13 +94,6 @@ void Multicaster::selfDeliver(const Event& e) { sys::Mutex::ScopedLock l(lock); QPID_LATENCY_RECORD("cpg self deliver", e); - if (mcastMax) { - assert(pending > 0); - assert(pending <= mcastMax); - if (pending == mcastMax) - queue.start(); - --pending; - } } }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=738618&r1=738617&r2=738618&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Wed Jan 28 20:48:23 2009 @@ -46,7 +46,6 @@ public: /** Starts in holding mode: connection data events are held, other events are mcast */ Multicaster(Cpg& cpg_, - size_t mcastMax, const boost::shared_ptr<sys::Poller>&, boost::function<void()> onError ); @@ -71,7 +70,6 @@ bool holding; PlainEventQueue holdingQueue; std::vector<struct ::iovec> ioVector; - size_t mcastMax, pending; }; }} // namespace qpid::cluster --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org