Author: aconway
Date: Tue Jun 16 21:21:09 2009
New Revision: 785408

URL: http://svn.apache.org/viewvc?rev=785408&view=rev
Log:
Performance improvements in AggregateOutput and SemanticState.

Replaced AggregateOutput hierarchy with a flat list per connection
holding only the OutputTasks that are potentially active.  Tasks are
droped from the list as soon as they return false, and added back when
they may have output.

Inlined frequently-used SequenceNumber functions.

Replace std::list in QueueListeners with std::vector.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
    qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
    qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 16 21:21:09 2009
@@ -134,6 +134,9 @@
     /** Called by cluster to mark shadow connections */
     void setShadow() { shadow = true; }
 
+    // Used by cluster to update connection status
+    sys::AggregateOutput& getOutputTasks() { return outputTasks; }
+
   private:
     typedef boost::ptr_map<framing::ChannelId, SessionHandler> ChannelMap;
     typedef std::vector<Queue::shared_ptr>::iterator queue_iterator;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 16 21:21:09 2009
@@ -1025,3 +1025,5 @@
 {
     return !policy.get() || policy->isEnqueued(msg);
 }
+
+QueueListeners& Queue::getListeners() { return listeners; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 16 21:21:09 2009
@@ -325,6 +325,9 @@
              * Notify queue that recovery has completed.
              */
             void recoveryComplete();
+
+            // For cluster update
+            QueueListeners& getListeners();
         };
     }
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp Tue Jun 16 21:21:09 
2009
@@ -46,9 +46,11 @@
 {
     if (consumers.size()) {
         set.consumer = consumers.front();
-        consumers.pop_front();
+        consumers.erase(consumers.begin());
     } else {
-        browsers.swap(set.browsers);
+        // Don't swap the vectors, hang on to the memory allocated.
+        set.browsers = browsers;
+        browsers.clear();
     }
 }
 
@@ -70,4 +72,10 @@
     else for_each(browsers.begin(), browsers.end(), 
boost::mem_fn(&Consumer::notify));
 }
 
+bool QueueListeners::contains(Consumer::shared_ptr c) const {
+    return
+        find(browsers.begin(), browsers.end(), c) != browsers.end() ||
+        find(consumers.begin(), consumers.end(), c) != consumers.end();
+}
+
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h Tue Jun 16 21:21:09 
2009
@@ -22,7 +22,7 @@
  *
  */
 #include "Consumer.h"
-#include <list>
+#include <vector>
 
 namespace qpid {
 namespace broker {
@@ -40,7 +40,7 @@
 class QueueListeners
 {
   public:
-    typedef std::list<Consumer::shared_ptr> Listeners;
+    typedef std::vector<Consumer::shared_ptr> Listeners;
 
     class NotificationSet
     {
@@ -55,6 +55,8 @@
     void addListener(Consumer::shared_ptr);    
     void removeListener(Consumer::shared_ptr);    
     void populate(NotificationSet&);
+    bool contains(Consumer::shared_ptr c) const;
+
   private:
     Listeners consumers;
     Listeners browsers;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Jun 16 21:21:09 
2009
@@ -61,7 +61,6 @@
       deliveryAdapter(da),
       tagGenerator("sgen"),
       dtxSelected(false),
-      outputTasks(ss),
       authMsg(getSession().getBroker().getOptions().auth && 
!getSession().getConnection().isFederationLink()),
       
userID(getSession().getConnection().getUserId().substr(0,getSession().getConnection().getUserId().find('@')))
 {
@@ -90,7 +89,6 @@
 {
     ConsumerImpl::shared_ptr c(new ConsumerImpl(this, tag, queue, ackRequired, 
acquire, exclusive, resumeId, resumeTtl, arguments));
     queue->consume(c, exclusive);//may throw exception
-    outputTasks.addOutputTask(c.get());
     consumers[tag] = c;
 }
 
@@ -98,7 +96,7 @@
     ConsumerImplMap::iterator i = consumers.find(tag);
     if (i != consumers.end()) {
         cancel(i->second);
-        consumers.erase(i); 
+        consumers.erase(i);
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
         for_each(unacked.begin(), unacked.end(), 
boost::bind(&DeliveryRecord::cancel, _1, tag));
@@ -257,9 +255,9 @@
     msgCredit(0), 
     byteCredit(0),
     notifyEnabled(true),
-    queueHasMessages(1),
     syncFrequency(_arguments.getAsInt("qpid.sync_frequency")),
-    deliveryCount(0) {}
+    deliveryCount(0)
+{}
 
 OwnershipToken* SemanticState::ConsumerImpl::getSession()
 {
@@ -290,6 +288,11 @@
 
 bool SemanticState::ConsumerImpl::accept(intrusive_ptr<Message> msg)
 {
+    // FIXME aconway 2009-06-08: if we have byte & message credit but
+    // checkCredit fails because the message is to big, we should
+    // remain on queue's listener list for possible smaller messages
+    // in future.
+    // 
     blocked = !(filter(msg) && checkCredit(msg));
     return !blocked;
 }
@@ -328,7 +331,8 @@
 void SemanticState::cancel(ConsumerImpl::shared_ptr c)
 {
     c->disableNotify();
-    outputTasks.removeOutputTask(c.get());
+    if (session.isAttached())
+        session.getConnection().outputTasks.removeOutputTask(c.get());
     Queue::shared_ptr queue = c->getQueue();
     if(queue) {
         queue->cancel(c);
@@ -397,16 +401,18 @@
 }
 
 void SemanticState::requestDispatch()
-{    
-    for (ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); i++) {
-        requestDispatch(*(i->second));
-    }
+{
+    for (ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); i++)
+        i->second->requestDispatch();
 }
 
-void SemanticState::requestDispatch(ConsumerImpl& c)
-{    
-    if(c.isBlocked())
-        outputTasks.activateOutput();
+void SemanticState::ConsumerImpl::requestDispatch()
+{
+    if (blocked) {
+        parent->session.getConnection().outputTasks.addOutputTask(this);
+        parent->session.getConnection().outputTasks.activateOutput();
+        blocked = false;
+    }
 }
 
 bool SemanticState::complete(DeliveryRecord& delivery)
@@ -475,7 +481,7 @@
 {
     ConsumerImpl& c = find(destination);
     c.addByteCredit(value);
-    requestDispatch(c);
+    c.requestDispatch();
 }
 
 
@@ -483,7 +489,7 @@
 {
     ConsumerImpl& c = find(destination);
     c.addMessageCredit(value);
-    requestDispatch(c);
+    c.requestDispatch();
 }
 
 void SemanticState::flush(const std::string& destination)
@@ -593,11 +599,7 @@
 
 bool SemanticState::ConsumerImpl::doOutput()
 {
-    if (!haveCredit() || !queueHasMessages.boolCompareAndSwap(1, 0))
-        return false;
-    if (queue->dispatch(shared_from_this()))
-        queueHasMessages.boolCompareAndSwap(0, 1);
-    return queueHasMessages.get();
+    return haveCredit() && queue->dispatch(shared_from_this());
 }
 
 void SemanticState::ConsumerImpl::enableNotify()
@@ -619,14 +621,11 @@
 
 void SemanticState::ConsumerImpl::notify()
 {
-    queueHasMessages.boolCompareAndSwap(0, 1);
-
-    //TODO: alter this, don't want to hold locks across external
-    //calls; for now its is required to protect the notify() from
-    //having part of the object chain of the invocation being
-    //concurrently deleted
     Mutex::ScopedLock l(lock);
-    if (notifyEnabled) parent->outputTasks.activateOutput();
+    if (notifyEnabled) {
+        parent->session.getConnection().outputTasks.addOutputTask(this);
+        parent->session.getConnection().outputTasks.activateOutput();
+    }
 }
 
 
@@ -670,13 +669,16 @@
 {
     for (ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); i++) {
         i->second->enableNotify();
+        session.getConnection().outputTasks.addOutputTask(i->second.get());
     }
+    session.getConnection().outputTasks.activateOutput();
 }
 
 void SemanticState::detached()
 {
     for (ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); i++) {
         i->second->disableNotify();
+        session.getConnection().outputTasks.removeOutputTask(i->second.get());
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Jun 16 21:21:09 2009
@@ -55,9 +55,7 @@
  * SemanticState holds the L3 and L4 state of an open session, whether
  * attached to a channel or suspended. 
  */
-class SemanticState : public sys::OutputTask,
-                      private boost::noncopyable
-{
+class SemanticState : private boost::noncopyable {
   public:
     class ConsumerImpl : public Consumer, public sys::OutputTask,
                          public boost::enable_shared_from_this<ConsumerImpl>
@@ -77,9 +75,6 @@
         uint32_t msgCredit;
         uint32_t byteCredit;
         bool notifyEnabled;
-        // queueHasMessages is boolean but valgrind has trouble with
-        // AtomicValue<bool> so use an int with 1 or 0.
-        sys:: AtomicValue<int> queueHasMessages; 
         const int syncFrequency;
         int deliveryCount;
 
@@ -105,6 +100,8 @@
         void notify();
         bool isNotifyEnabled() const;
 
+        void requestDispatch();
+
         void setWindowMode();
         void setCreditMode();
         void addByteCredit(uint32_t value);
@@ -130,6 +127,8 @@
         std::string getResumeId() const { return resumeId; };
         uint64_t getResumeTtl() const { return resumeTtl; }
         const framing::FieldTable& getArguments() const { return arguments; }
+
+        SemanticState& getParent() { return *parent; }
     };
 
   private:
@@ -147,7 +146,6 @@
     DtxBufferMap suspendedXids;
     framing::SequenceSet accumulatedAck;
     boost::shared_ptr<Exchange> cacheExchange;
-    sys::AggregateOutput outputTasks;
     AclModule* acl;
     const bool authMsg;
     const string userID;
@@ -158,7 +156,6 @@
     bool complete(DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
     void requestDispatch();
-    void requestDispatch(ConsumerImpl&);
     void cancel(ConsumerImpl::shared_ptr);
 
   public:
@@ -208,8 +205,6 @@
     void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
     void handle(boost::intrusive_ptr<Message> msg);
-    bool hasOutput() { return outputTasks.hasOutput(); }
-    bool doOutput() { return outputTasks.doOutput(); }
 
     //final 0-10 spec (completed and accepted are distinct):
     void completed(DeliveryId deliveryTag, DeliveryId endTag);
@@ -218,10 +213,11 @@
     void attached();
     void detached();
 
-    // Used by cluster to re-create replica sessions
-    static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return 
boost::polymorphic_downcast<ConsumerImpl*>(p); }
-
-    template <class F> void eachConsumer(F f) { 
outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
+    // Used by cluster to re-create sessions
+    template <class F> void eachConsumer(F f) {
+        for(ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); ++i)
+            f(i->second);
+    }
     DeliveryRecords& getUnacked() { return unacked; }
     framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
     TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Tue Jun 16 21:21:09 
2009
@@ -40,9 +40,11 @@
   public:
     virtual ~SessionContext(){}
     virtual bool isLocal(const ConnectionToken* t) const = 0;
+    virtual bool isAttached() const = 0;
     virtual ConnectionState& getConnection() = 0;
     virtual framing::AMQP_ClientProxy& getProxy() = 0;
     virtual Broker& getBroker() = 0;
+    virtual uint16_t getChannel() const = 0;
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Jun 16 21:21:09 
2009
@@ -99,6 +99,11 @@
     return handler->getProxy();
 }
 
+uint16_t SessionState::getChannel() const {
+    assert(isAttached());
+    return handler->getChannel();
+}
+
 ConnectionState& SessionState::getConnection() {
     assert(isAttached());
     return handler->getConnection();
@@ -119,8 +124,7 @@
 
 void SessionState::disableOutput()
 {
-    semanticState.detached();//prevents further activateOutput calls until 
reattached
-    getConnection().outputTasks.removeOutputTask(&semanticState);
+    semanticState.detached(); //prevents further activateOutput calls until 
reattached
 }
 
 void SessionState::attach(SessionHandler& h) {
@@ -362,10 +366,6 @@
     QPID_LOG(debug, getId() << ": ready to send, activating output.");
     assert(handler);
     semanticState.attached();
-    sys::AggregateOutput& tasks = handler->getConnection().outputTasks;
-    tasks.addOutputTask(&semanticState);
-    tasks.activateOutput();
-
     if (rateFlowcontrol) {
         qpid::sys::ScopedLock<Mutex> l(rateLock);
         // Issue initial credit - use a heuristic here issue min of 300 
messages or 1 secs worth

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Jun 16 21:21:09 2009
@@ -81,6 +81,9 @@
     framing::AMQP_ClientProxy& getProxy();
 
     /** @pre isAttached() */
+    uint16_t getChannel() const;
+
+    /** @pre isAttached() */
     ConnectionState& getConnection();
     bool isLocal(const ConnectionToken* t) const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jun 16 21:21:09 2009
@@ -755,13 +755,16 @@
     expiryPolicy->deliverExpire(id);
 }
 
-void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, 
Lock&) {
+void Cluster::errorCheck(const MemberId& m, uint8_t type, uint64_t frameSeq, 
Lock&) {
     // If we receive an errorCheck here, it's because we  have processed past 
the point
     // of the error so respond with ERROR_TYPE_NONE
     assert(map.getFrameSeq() >= frameSeq);
-    if (type != framing::cluster::ERROR_TYPE_NONE) // Don't respond if its 
already NONE.
+    if (type != framing::cluster::ERROR_TYPE_NONE) { // Don't respond to NONE.
+        QPID_LOG(debug, "Error " << frameSeq << " on " << m << " did not occur 
locally");
         mcast.mcastControl(
-            ClusterErrorCheckBody(ProtocolVersion(), 
framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
+            ClusterErrorCheckBody(ProtocolVersion(),
+                                  framing::cluster::ERROR_TYPE_NONE, 
frameSeq), self);
+    }
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jun 16 21:21:09 2009
@@ -113,7 +113,7 @@
     Decoder& getDecoder() { return decoder; }
 
     ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
-    
+
   private:
     typedef sys::Monitor::ScopedLock Lock;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun 16 21:21:09 2009
@@ -245,10 +245,13 @@
     return sessionState().getSemanticState();
 }
 
-void Connection::consumerState(const string& name, bool blocked, bool 
notifyEnabled) {
+void Connection::consumerState(
+    const string& name, bool blocked, bool notifyEnabled, bool isInListener)
+{
     broker::SemanticState::ConsumerImpl& c = semanticState().find(name);
     c.setBlocked(blocked);
     if (notifyEnabled) c.enableNotify(); else c.disableNotify();
+    if (isInListener) 
c.getQueue()->getListeners().addListener(c.shared_from_this());
 }
 
 void Connection::sessionState(
@@ -270,6 +273,17 @@
         unknownCompleted,
         receivedIncomplete);
     QPID_LOG(debug, cluster << " received session state update for " << 
sessionState().getId());
+    // The output tasks will be added later in the update process. 
+    connection.getOutputTasks().removeAll();
+}
+
+void Connection::outputTask(uint16_t channel, const std::string& name) {
+    broker::SessionState* session = 
connection.getChannel(channel).getSession();
+    if (!session)
+        throw Exception(QPID_MSG(cluster << " channel not attached " << *this
+                                 << "[" <<  channel << "] "));
+    OutputTask* task = &session->getSemanticState().find(name);
+    connection.getOutputTasks().addOutputTask(task);
 }
     
 void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const 
string& username, const string& fragment, uint32_t sendMax) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun 16 21:21:09 2009
@@ -103,7 +103,7 @@
     // Called for data delivered from the cluster.
     void deliveredFrame(const EventFrame&);
 
-    void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled);
+    void consumerState(const std::string& name, bool blocked, bool 
notifyEnabled, bool isInListener);
     
     // ==== Used in catch-up mode to build initial state.
     // 
@@ -115,6 +115,8 @@
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted, const 
SequenceSet& receivedIncomplete);
     
+    void outputTask(uint16_t channel, const std::string& name);
+    
     void shadowReady(uint64_t memberId, uint64_t connectionId, const 
std::string& username, const std::string& fragment, uint32_t sendMax);
 
     void membership(const framing::FieldTable&, const framing::FieldTable&, 
uint64_t frameSeq);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue Jun 16 
21:21:09 2009
@@ -48,8 +48,6 @@
     LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
     parent.getCluster().checkQuorum();
     {
-        // FIXME aconway 2009-04-28: locking around next-> may be redundant
-        // with the fixes to read-credit in the IO layer. Review.
         sys::Mutex::ScopedLock l(lock);
         next->send(f);
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Jun 16 21:21:09 
2009
@@ -54,6 +54,7 @@
 #include "qpid/log/Statement.h"
 #include "qpid/Url.h"
 #include <boost/bind.hpp>
+#include <boost/cast.hpp>
 #include <algorithm>
 
 namespace qpid {
@@ -64,6 +65,8 @@
 using broker::Queue;
 using broker::QueueBinding;
 using broker::Message;
+using broker::SemanticState;
+
 using namespace framing;
 namespace arg=client::arg;
 using client::SessionBase_0_10Access;
@@ -125,7 +128,8 @@
     Broker& b = updaterBroker;
     b.getExchanges().eachExchange(boost::bind(&UpdateClient::updateExchange, 
this, _1));
     
b.getQueues().eachQueue(boost::bind(&UpdateClient::updateNonExclusiveQueue, 
this, _1));
-    // Update queue is used to transfer acquired messages that are no longer 
on their original queue.
+    // Update queue is used to transfer acquired messages that are no
+    // longer on their original queue.
     session.queueDeclare(arg::queue=UPDATE, arg::autoDelete=true);
     session.sync();
 
@@ -256,6 +260,16 @@
     s.exchangeBind(queue, binding.exchange, binding.key, binding.args);
 }
 
+void UpdateClient::updateOutputTask(const sys::OutputTask* task) {
+    const SemanticState::ConsumerImpl* cci =
+        boost::polymorphic_downcast<const SemanticState::ConsumerImpl*> (task);
+    SemanticState::ConsumerImpl* ci = 
const_cast<SemanticState::ConsumerImpl*>(cci);
+    uint16_t channel =  ci->getParent().getSession().getChannel();
+    ClusterConnectionProxy(shadowConnection).outputTask(channel,  
ci->getName());
+    QPID_LOG(debug, updaterId << " updating output task " << ci->getName()
+             << " channel=" << channel);
+}
+
 void UpdateClient::updateConnection(const boost::intrusive_ptr<Connection>& 
updateConnection) {
     QPID_LOG(debug, updaterId << " updating connection " << *updateConnection);
     shadowConnection = catchUpConnection();
@@ -266,6 +280,8 @@
     bc.eachSessionHandler(boost::bind(&UpdateClient::updateSession, this, _1));
     // Safe to use decoder here because we are stalled for update.
     std::pair<const char*, size_t> fragment = 
decoder.get(updateConnection->getId()).getFragment();
+    bc.getOutputTasks().eachOutput(
+        boost::bind(&UpdateClient::updateOutputTask, this, _1));
     ClusterConnectionProxy(shadowConnection).shadowReady(
         updateConnection->getId().getMember(),
         updateConnection->getId().getNumber(),
@@ -294,9 +310,9 @@
     QPID_LOG(debug, updaterId << " updating exclusive queues.");
     
ss->getSessionAdapter().eachExclusiveQueue(boost::bind(&UpdateClient::updateExclusiveQueue,
 this, _1));
 
-    // Update consumers. For reasons unknown, boost::bind does not work here 
with boost 1.33.
     QPID_LOG(debug, updaterId << " updating consumers.");
-    
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&UpdateClient::updateConsumer),this));
+    ss->getSemanticState().eachConsumer(
+        boost::bind(&UpdateClient::updateConsumer, this, _1));
 
     QPID_LOG(debug, updaterId << " updating unacknowledged messages.");
     broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
@@ -304,7 +320,7 @@
 
     updateTxState(ss->getSemanticState());           // Tx transaction state.
 
-    //  Adjust for command counter for message in progress, will be sent after 
state update.
+    // Adjust command counter for message in progress, will be sent after 
state update.
     boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
     SequenceNumber received = ss->receiverGetReceived().command;
     if (inProgress)  
@@ -328,8 +344,11 @@
     QPID_LOG(debug, updaterId << " updated session " << 
sh.getSession()->getId());
 }
 
-void UpdateClient::updateConsumer(const broker::SemanticState::ConsumerImpl* 
ci) {
-    QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " 
on " << shadowSession.getId());
+void UpdateClient::updateConsumer(
+    const broker::SemanticState::ConsumerImpl::shared_ptr& ci)
+{
+    QPID_LOG(debug, updaterId << " updating consumer " << ci->getName() << " 
on "
+             << shadowSession.getId());
     using namespace message;
     shadowSession.messageSubscribe(
         arg::queue       = ci->getQueue()->getName(),
@@ -344,13 +363,12 @@
     shadowSession.messageSetFlowMode(ci->getName(), ci->isWindowing() ? 
FLOW_MODE_WINDOW : FLOW_MODE_CREDIT);
     shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_MESSAGE, 
ci->getMsgCredit());
     shadowSession.messageFlow(ci->getName(), CREDIT_UNIT_BYTE, 
ci->getByteCredit());
-    ClusterConnectionConsumerStateBody state(
-        ProtocolVersion(),
+    ClusterConnectionProxy(shadowSession).consumerState(
         ci->getName(),
         ci->isBlocked(),
-        ci->isNotifyEnabled()
+        ci->isNotifyEnabled(),
+        ci->getQueue()->getListeners().contains(ci)
     );
-    client::SessionBase_0_10Access(shadowSession).get()->send(state);
     QPID_LOG(debug, updaterId << " updated consumer " << ci->getName() << " on 
" << shadowSession.getId());
 }
     

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Jun 16 21:21:09 2009
@@ -91,7 +91,8 @@
     void updateConnection(const boost::intrusive_ptr<Connection>& connection);
     void updateSession(broker::SessionHandler& s);
     void updateTxState(broker::SemanticState& s);
-    void updateConsumer(const broker::SemanticState::ConsumerImpl*);
+    void updateOutputTask(const sys::OutputTask* task);
+    void updateConsumer(const 
broker::SemanticState::ConsumerImpl::shared_ptr&);
 
     MemberId updaterId;
     MemberId updateeId;

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.cpp Tue Jun 16 21:21:09 
2009
@@ -26,60 +26,6 @@
 using qpid::framing::SequenceNumber;
 using qpid::framing::Buffer;
 
-SequenceNumber::SequenceNumber() : value(0) {}
-
-SequenceNumber::SequenceNumber(uint32_t v) : value((int32_t) v) {}
-
-bool SequenceNumber::operator==(const SequenceNumber& other) const
-{
-    return value == other.value;
-}
-
-bool SequenceNumber::operator!=(const SequenceNumber& other) const
-{
-    return !(value == other.value);
-}
-
-
-SequenceNumber& SequenceNumber::operator++()
-{
-    value = value + 1;
-    return *this;
-}
-
-const SequenceNumber SequenceNumber::operator++(int)
-{
-    SequenceNumber old(value);
-    value = value + 1;
-    return old;
-}
-
-SequenceNumber& SequenceNumber::operator--()
-{
-    value = value - 1;
-    return *this;
-}
-
-bool SequenceNumber::operator<(const SequenceNumber& other) const
-{
-    return (value - other.value) < 0;
-}
-
-bool SequenceNumber::operator>(const SequenceNumber& other) const
-{
-    return other < *this;
-}
-
-bool SequenceNumber::operator<=(const SequenceNumber& other) const
-{
-    return *this == other || *this < other; 
-}
-
-bool SequenceNumber::operator>=(const SequenceNumber& other) const
-{
-    return *this == other || *this > other; 
-}
-
 void SequenceNumber::encode(Buffer& buffer) const
 {
     buffer.putLong(value);
@@ -97,12 +43,6 @@
 namespace qpid {
 namespace framing {
 
-int32_t operator-(const SequenceNumber& a, const SequenceNumber& b)
-{
-    int32_t result = a.value - b.value;    
-    return result;
-}
-
 std::ostream& operator<<(std::ostream& o, const SequenceNumber& n) {
     return o << n.getValue();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/framing/SequenceNumber.h Tue Jun 16 21:21:09 
2009
@@ -22,6 +22,7 @@
 #define _framing_SequenceNumber_h
 
 #include "amqp_types.h"
+#include <boost/operators.hpp>
 #include <iosfwd>
 #include "qpid/CommonImportExport.h"
 
@@ -33,35 +34,37 @@
 /**
  * 4-byte sequence number that 'wraps around'.
  */
-class SequenceNumber
+class SequenceNumber : public
+boost::equality_comparable<
+    SequenceNumber, boost::less_than_comparable<
+        SequenceNumber, boost::incrementable<
+        SequenceNumber, boost::decrementable<SequenceNumber> > > >
 {
     int32_t value;
 
- public:
-    QPID_COMMON_EXTERN SequenceNumber();
-    QPID_COMMON_EXTERN SequenceNumber(uint32_t v);
-
-    QPID_COMMON_EXTERN SequenceNumber& operator++();//prefix ++
-    QPID_COMMON_EXTERN const SequenceNumber operator++(int);//postfix ++
-    QPID_COMMON_EXTERN SequenceNumber& operator--();//prefix ++
-    QPID_COMMON_EXTERN bool operator==(const SequenceNumber& other) const;
-    QPID_COMMON_EXTERN bool operator!=(const SequenceNumber& other) const;
-    QPID_COMMON_EXTERN bool operator<(const SequenceNumber& other) const;
-    QPID_COMMON_EXTERN bool operator>(const SequenceNumber& other) const;
-    QPID_COMMON_EXTERN bool operator<=(const SequenceNumber& other) const;
-    QPID_COMMON_EXTERN bool operator>=(const SequenceNumber& other) const;
-    uint32_t getValue() const { return (uint32_t) value; }
-    operator uint32_t() const { return (uint32_t) value; }
-
-    QPID_COMMON_EXTERN friend int32_t operator-(const SequenceNumber& a, const 
SequenceNumber& b);
+  public:
+    SequenceNumber(uint32_t v=0) : value(v) {}
+  
+    SequenceNumber& operator++() { ++value; return *this; }
+    SequenceNumber& operator--() { --value; return *this; }
+    bool operator==(const SequenceNumber& other) const { return value == 
other.value; }
+    bool operator<(const SequenceNumber& other) const { return (value - 
other.value) < 0; }
+    uint32_t getValue() const { return uint32_t(value); }
+    operator uint32_t() const { return uint32_t(value); }
 
     void encode(Buffer& buffer) const;
     void decode(Buffer& buffer);
     uint32_t encodedSize() const;   
 
     template <class S> void serialize(S& s) { s(value); }
+
+  friend inline int32_t operator-(const SequenceNumber& a, const 
SequenceNumber& b);
 };    
 
+inline int32_t operator-(const SequenceNumber& a, const SequenceNumber& b) {
+    return int32_t(a.value - b.value);
+}
+
 struct Window 
 {
     SequenceNumber hwm;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.cpp Tue Jun 16 21:21:09 
2009
@@ -26,50 +26,66 @@
 namespace qpid {
 namespace sys {
 
+AggregateOutput::AggregateOutput(OutputControl& c) : busy(false), control(c) {}
+
 void AggregateOutput::abort() { control.abort(); }
 
 void AggregateOutput::activateOutput() { control.activateOutput(); }
 
 void AggregateOutput::giveReadCredit(int32_t credit) { 
control.giveReadCredit(credit); }
 
-bool AggregateOutput::hasOutput() {
-    for (TaskList::const_iterator i = tasks.begin(); i != tasks.end(); ++i)
-        if ((*i)->hasOutput()) return true;
-    return false;
+bool AggregateOutput::AggregateOutput::hasOutput() {
+    Mutex::ScopedLock l(lock);
+    return !tasks.empty();
 }
 
-bool AggregateOutput::doOutput()
-{
-    bool result = false;
-    if (!tasks.empty()) {
-        if (next >= tasks.size()) next = next % tasks.size();
-
-        size_t start = next;
-        //loop until a task generated some output
-        while (!result) {
-            result = tasks[next++]->doOutput();
-           if (tasks.empty()) break;
-            if (next >= tasks.size()) next = next % tasks.size();
-            if (start == next) break;
+// Clear the busy flag and notify waiting threads in destructor.
+struct ScopedBusy {
+    bool& flag;
+    Monitor& monitor;
+    ScopedBusy(bool& f, Monitor& m) : flag(f), monitor(m) { f = true; }
+    ~ScopedBusy() { flag = false; monitor.notifyAll(); }
+};
+  
+bool AggregateOutput::doOutput() {
+    Mutex::ScopedLock l(lock);
+    ScopedBusy sb(busy, lock);
+
+    while (!tasks.empty()) {
+        OutputTask* t=tasks.front();
+        tasks.pop_front();
+        bool didOutput;
+        {
+            // Allow concurrent call to addOutputTask.
+            // removeOutputTask will wait till !busy before removing a task.
+            Mutex::ScopedUnlock u(lock);
+            didOutput = t->doOutput();
+        }
+        if (didOutput) {
+            tasks.push_back(t);
+            return true;
         }
     }
-    return result;
+    return false;
 }
-
-void AggregateOutput::addOutputTask(OutputTask* t)
-{
-    tasks.push_back(t);
+  
+void AggregateOutput::addOutputTask(OutputTask* task) {
+    Mutex::ScopedLock l(lock);
+    tasks.push_back(task);
 }
 
-void AggregateOutput::removeOutputTask(OutputTask* t)
-{
-    TaskList::iterator i = std::find(tasks.begin(), tasks.end(), t);
-    if (i != tasks.end()) tasks.erase(i);
+void AggregateOutput::removeOutputTask(OutputTask* task) {
+    Mutex::ScopedLock l(lock);
+    while (busy) lock.wait();
+    tasks.erase(std::remove(tasks.begin(), tasks.end(), task), tasks.end());
 }
-
+  
 void AggregateOutput::removeAll()
 {
+    Mutex::ScopedLock l(lock);
+    while (busy) lock.wait();
     tasks.clear();
 }
+  
 
 }} // namespace qpid::sys

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/AggregateOutput.h Tue Jun 16 21:21:09 2009
@@ -21,47 +21,58 @@
 #ifndef _AggregateOutput_
 #define _AggregateOutput_
 
-#include "Mutex.h"
+#include "Monitor.h"
 #include "OutputControl.h"
 #include "OutputTask.h"
 #include "qpid/CommonImportExport.h"
 
 #include <algorithm>
-#include <vector>
+#include <deque>
 
 namespace qpid {
 namespace sys {
 
-    class AggregateOutput : public OutputTask, public OutputControl
-    {
-        typedef std::vector<OutputTask*> TaskList;
-
-        TaskList tasks;
-        size_t next;
-        OutputControl& control;
-
-    public:
-        AggregateOutput(OutputControl& c) : next(0), control(c) {};
-        //this may be called on any thread
-        QPID_COMMON_EXTERN void abort();
-        QPID_COMMON_EXTERN void activateOutput();
-        QPID_COMMON_EXTERN void giveReadCredit(int32_t);
-
-        //all the following will be called on the same thread
-        QPID_COMMON_EXTERN bool doOutput();
-        QPID_COMMON_EXTERN bool hasOutput();
-        QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
-        QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t);
-        QPID_COMMON_EXTERN void removeAll();
-
-        /** Apply f to each OutputTask* in the tasks list */
-        template <class F> void eachOutput(F f) {
-            std::for_each(tasks.begin(), tasks.end(), f);
-        }
-    };
+/**
+ * Holds a collection of output tasks, doOutput picks the next one to execute.
+ * 
+ * Tasks are automatically removed if their doOutput() or hasOutput() returns 
false.
+ * 
+ * Thread safe. addOutputTask may be called in one connection thread while
+ * doOutput is called in another.
+ */
+
+class AggregateOutput : public OutputTask, public OutputControl
+{
+    typedef std::deque<OutputTask*> TaskList;
+
+    Monitor lock;
+    TaskList tasks;
+    bool busy;
+    OutputControl& control;
+
+  public:
+    QPID_COMMON_EXTERN AggregateOutput(OutputControl& c);
+
+    // These may be called concurrently with any function.
+    QPID_COMMON_EXTERN void abort();
+    QPID_COMMON_EXTERN void activateOutput();
+    QPID_COMMON_EXTERN void giveReadCredit(int32_t);
+    QPID_COMMON_EXTERN void addOutputTask(OutputTask* t);
+
+    // These functions must not be called concurrently with each other.
+    QPID_COMMON_EXTERN bool doOutput();
+    QPID_COMMON_EXTERN bool hasOutput();
+    QPID_COMMON_EXTERN void removeOutputTask(OutputTask* t);
+    QPID_COMMON_EXTERN void removeAll();
+
+    /** Apply f to each OutputTask* in the tasks list */
+    template <class F> void eachOutput(F f) {
+        Mutex::ScopedLock l(lock);
+        std::for_each(tasks.begin(), tasks.end(), f);
+    }
+};
 
-}
-}
+}} // namespace qpid::sys
 
 
 #endif

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=785408&r1=785407&r2=785408&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jun 16 21:21:09 2009
@@ -65,8 +65,6 @@
 
   </class>
 
-  <!-- TODO aconway 2008-09-10: support for un-attached connections. -->
-  
   <!-- Controls associated with a specific connection. -->
 
   <class name="cluster-connection" code="0x81" label="Qpid clustering 
extensions.">
@@ -91,6 +89,8 @@
       <field name="name" type="str8"/>
       <field name="blocked" type="bit"/>
       <field name="notifyEnabled" type="bit"/>
+      <!-- Flag set if the consumer is in its queue's listener set. -->
+      <field name="is-in-listener" type="bit"/>
     </control>
 
     <!-- Delivery-record for outgoing messages sent but not yet accepted. -->
@@ -121,8 +121,14 @@
     <control name="tx-end" code="0x17"/>
     <control name="accumulated-ack" code="0x18"> <field name="commands" 
type="sequence-set"/> </control>
     
+    <!-- Consumers in the connection's output task -->
+    <control name="output-task" code="0x19">
+      <field name="channel" type="uint16"/>
+      <field name="name" type="str8"/>
+    </control>
+
     <!-- Complete a session state update. -->
-    <control name="session-state" code="0x1F" label="Set session state during 
a brain update.">
+    <control name="session-state" code="0x1F">
       <!-- Target session deduced from channel number.  -->
       <field name="replay-start" type="sequence-no"/>         <!-- Replay 
frames will start from this point.-->
       <field name="command-point" type="sequence-no"/>        <!-- Id of next 
command sent -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to