Author: gsim
Date: Sat Nov 29 08:01:54 2008
New Revision: 721685

URL: http://svn.apache.org/viewvc?rev=721685&view=rev
Log:
QPID-1280: fixed performance regression for multiple subscribers on shared queue


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp   (with 
props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h   (with 
props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=721685&r1=721684&r2=721685&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Sat Nov 29 08:01:54 2008
@@ -347,6 +347,7 @@
   qpid/broker/Exchange.cpp \
   qpid/broker/Queue.cpp \
   qpid/broker/QueueCleaner.cpp \
+  qpid/broker/QueueListeners.cpp \
   qpid/broker/PersistableMessage.cpp \
   qpid/broker/Bridge.cpp \
   qpid/broker/Connection.cpp \
@@ -473,6 +474,7 @@
   qpid/broker/SessionAdapter.h \
   qpid/broker/Exchange.h \
   qpid/broker/Queue.h \
+  qpid/broker/QueueListeners.h \
   qpid/broker/QueueCleaner.h \
   qpid/broker/BrokerSingleton.h \
   qpid/broker/Bridge.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=721685&r1=721684&r2=721685&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Sat Nov 29 08:01:54 
2008
@@ -109,12 +109,12 @@
 
 void Queue::notifyDurableIOComplete()
 {
-    Listeners copy;
+    QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);
-        listeners.swap(copy);
+        listeners.populate(copy);
     }
-    for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+    copy.notify();
 }
 
 bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -187,14 +187,14 @@
 void Queue::requeue(const QueuedMessage& msg){
     if (policy.get() && !policy->isEnqueued(msg)) return;
 
-    Listeners copy;
+    QueueListeners::NotificationSet copy;
     {    
         Mutex::ScopedLock locker(messageLock);
         msg.payload->enqueueComplete(); // mark the message as enqueued
         messages.push_front(msg);
-        listeners.swap(copy);
+        listeners.populate(copy);
     }
-    for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+    copy.notify();
 }
 
 void Queue::clearLVQIndex(const QueuedMessage& msg){
@@ -240,7 +240,7 @@
     if (messages.empty()) {
         //no message available, register consumer for notification
         //when this changes
-        addListener(c);
+        listeners.addListener(c);
         return false;
     } else {
         QueuedMessage msg = getFront();
@@ -248,7 +248,7 @@
             //though a message is on the queue, it has not yet been
             //enqueued and so is not available for consumption yet,
             //register consumer for notification when this changes
-            addListener(c);
+            listeners.addListener(c);
             return false;            
         } else {
             //check that consumer has sufficient credit for the
@@ -266,7 +266,7 @@
         Mutex::ScopedLock locker(messageLock);
         if (messages.empty()) { 
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << 
"'");
-            addListener(c);
+            listeners.addListener(c);
             return false;
         } else {
             QueuedMessage msg = getFront();
@@ -323,15 +323,15 @@
 
 void Queue::removeListener(Consumer::shared_ptr c)
 {
-    Mutex::ScopedLock locker(messageLock);
-    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
-    if (i != listeners.end()) listeners.erase(i);
-}
-
-void Queue::addListener(Consumer::shared_ptr c)
-{
-    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
-    if (i == listeners.end()) listeners.push_back(c);
+    QueueListeners::NotificationSet set;
+    {
+        Mutex::ScopedLock locker(messageLock);
+        listeners.removeListener(c);
+        if (messages.size()) {
+            listeners.populate(set);
+        }
+    }
+    set.notify();
 }
 
 bool Queue::dispatch(Consumer::shared_ptr c)
@@ -361,7 +361,7 @@
             return true;
         }
     }
-    addListener(c);
+    listeners.addListener(c);
     return false;
 }
 
@@ -491,7 +491,7 @@
 }
 
 void Queue::push(boost::intrusive_ptr<Message>& msg){
-    Listeners copy;
+    QueueListeners::NotificationSet copy;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
@@ -505,17 +505,17 @@
             i = lvq.find(key);
             if (i == lvq.end()){
                 messages.push_back(qm);
-                listeners.swap(copy);
+                listeners.populate(copy);
                 lvq[key] = msg;
             }else {
                 i->second->setReplacementMessage(msg,this);
             }           
         }else {
             messages.push_back(qm);
-            listeners.swap(copy);
+            listeners.populate(copy);
         }
     }
-    for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+    copy.notify();
 }
 
 QueuedMessage Queue::getFront()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=721685&r1=721684&r2=721685&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Sat Nov 29 08:01:54 
2008
@@ -27,6 +27,7 @@
 #include "PersistableQueue.h"
 #include "QueuePolicy.h"
 #include "QueueBindings.h"
+#include "QueueListeners.h"
 #include "RateTracker.h"
 
 #include "qpid/framing/FieldTable.h"
@@ -64,7 +65,6 @@
         class Queue : public boost::enable_shared_from_this<Queue>,
             public PersistableQueue, public management::Manageable {
 
-            typedef std::list<Consumer::shared_ptr> Listeners;
             typedef std::deque<QueuedMessage> Messages;
             typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
 
@@ -80,7 +80,7 @@
             bool inLastNodeFailure;
             std::string traceId;
             std::vector<std::string> traceExclude;
-            Listeners listeners;
+            QueueListeners listeners;
             Messages messages;
             LVQ lvq;
             mutable qpid::sys::Mutex consumerLock;
@@ -94,7 +94,7 @@
             boost::shared_ptr<Exchange> alternateExchange;
             framing::SequenceNumber sequence;
             qmf::org::apache::qpid::broker::Queue* mgmtObject;
-RateTracker dequeueTracker;
+            RateTracker dequeueTracker;
 
             void push(boost::intrusive_ptr<Message>& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -104,7 +104,6 @@
             bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
 
             void removeListener(Consumer::shared_ptr);
-            void addListener(Consumer::shared_ptr);
 
             bool isExcluded(boost::intrusive_ptr<Message>& msg);
 

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp?rev=721685&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp Sat Nov 29 
08:01:54 2008
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueListeners.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+void QueueListeners::addListener(Consumer::shared_ptr c)
+{
+    if (c->preAcquires()) {
+        add(consumers, c);
+    } else {
+        add(browsers, c);
+    }
+}
+
+void QueueListeners::removeListener(Consumer::shared_ptr c)
+{
+    if (c->preAcquires()) {
+        remove(consumers, c);
+    } else {
+        remove(browsers, c);
+    }
+}
+
+void QueueListeners::populate(NotificationSet& set)
+{
+    if (consumers.size()) {
+        set.consumer = consumers.front();
+        consumers.pop_front();
+    } else {
+        browsers.swap(set.browsers);
+    }
+}
+
+void QueueListeners::add(Listeners& listeners, Consumer::shared_ptr c)
+{
+    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+    if (i == listeners.end()) listeners.push_back(c);
+}
+
+void QueueListeners::remove(Listeners& listeners, Consumer::shared_ptr c)
+{
+    Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+    if (i != listeners.end()) listeners.erase(i);
+}
+
+void QueueListeners::NotificationSet::notify()
+{
+    if (consumer) consumer->notify();
+    else for_each(browsers.begin(), browsers.end(), 
boost::mem_fn(&Consumer::notify));
+}
+
+}} // namespace qpid::broker

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h?rev=721685&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h Sat Nov 29 
08:01:54 2008
@@ -0,0 +1,68 @@
+#ifndef QPID_BROKER_QUEUELISTENERS_H
+#define QPID_BROKER_QUEUELISTENERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Consumer.h"
+#include <list>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Track and notify components that wish to be notified of messages
+ * that become available on a queue.
+ * 
+ * None of the methods defined here are protected by locking. However
+ * the populate method allows a 'snapshot' to be taken of the
+ * listeners to be notified. NotificationSet::notify() may then be
+ * called outside of any lock that protects the QueueListeners
+ * instance from concurrent access.
+ */
+class QueueListeners
+{
+  public:
+    typedef std::list<Consumer::shared_ptr> Listeners;
+
+    class NotificationSet
+    {
+      public:
+        void notify();
+      private:
+        Listeners browsers;
+        Consumer::shared_ptr consumer;
+      friend class QueueListeners;
+    };
+
+    void addListener(Consumer::shared_ptr);    
+    void removeListener(Consumer::shared_ptr);    
+    void populate(NotificationSet&);
+  private:
+    Listeners consumers;
+    Listeners browsers;
+
+    void add(Listeners&, Consumer::shared_ptr);
+    void remove(Listeners&, Consumer::shared_ptr);
+
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_QUEUELISTENERS_H*/

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to