Author: gsim
Date: Sat Nov 29 08:01:54 2008
New Revision: 721685
URL: http://svn.apache.org/viewvc?rev=721685&view=rev
Log:
QPID-1280: fixed performance regression for multiple subscribers on shared queue
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp (with
props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h (with
props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=721685&r1=721684&r2=721685&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Sat Nov 29 08:01:54 2008
@@ -347,6 +347,7 @@
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
qpid/broker/QueueCleaner.cpp \
+ qpid/broker/QueueListeners.cpp \
qpid/broker/PersistableMessage.cpp \
qpid/broker/Bridge.cpp \
qpid/broker/Connection.cpp \
@@ -473,6 +474,7 @@
qpid/broker/SessionAdapter.h \
qpid/broker/Exchange.h \
qpid/broker/Queue.h \
+ qpid/broker/QueueListeners.h \
qpid/broker/QueueCleaner.h \
qpid/broker/BrokerSingleton.h \
qpid/broker/Bridge.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=721685&r1=721684&r2=721685&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Sat Nov 29 08:01:54
2008
@@ -109,12 +109,12 @@
void Queue::notifyDurableIOComplete()
{
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
- listeners.swap(copy);
+ listeners.populate(copy);
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
@@ -187,14 +187,14 @@
void Queue::requeue(const QueuedMessage& msg){
if (policy.get() && !policy->isEnqueued(msg)) return;
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
msg.payload->enqueueComplete(); // mark the message as enqueued
messages.push_front(msg);
- listeners.swap(copy);
+ listeners.populate(copy);
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
void Queue::clearLVQIndex(const QueuedMessage& msg){
@@ -240,7 +240,7 @@
if (messages.empty()) {
//no message available, register consumer for notification
//when this changes
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
QueuedMessage msg = getFront();
@@ -248,7 +248,7 @@
//though a message is on the queue, it has not yet been
//enqueued and so is not available for consumption yet,
//register consumer for notification when this changes
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
//check that consumer has sufficient credit for the
@@ -266,7 +266,7 @@
Mutex::ScopedLock locker(messageLock);
if (messages.empty()) {
QPID_LOG(debug, "No messages to dispatch on queue '" << name <<
"'");
- addListener(c);
+ listeners.addListener(c);
return false;
} else {
QueuedMessage msg = getFront();
@@ -323,15 +323,15 @@
void Queue::removeListener(Consumer::shared_ptr c)
{
- Mutex::ScopedLock locker(messageLock);
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
- if (i != listeners.end()) listeners.erase(i);
-}
-
-void Queue::addListener(Consumer::shared_ptr c)
-{
- Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
- if (i == listeners.end()) listeners.push_back(c);
+ QueueListeners::NotificationSet set;
+ {
+ Mutex::ScopedLock locker(messageLock);
+ listeners.removeListener(c);
+ if (messages.size()) {
+ listeners.populate(set);
+ }
+ }
+ set.notify();
}
bool Queue::dispatch(Consumer::shared_ptr c)
@@ -361,7 +361,7 @@
return true;
}
}
- addListener(c);
+ listeners.addListener(c);
return false;
}
@@ -491,7 +491,7 @@
}
void Queue::push(boost::intrusive_ptr<Message>& msg){
- Listeners copy;
+ QueueListeners::NotificationSet copy;
{
Mutex::ScopedLock locker(messageLock);
QueuedMessage qm(this, msg, ++sequence);
@@ -505,17 +505,17 @@
i = lvq.find(key);
if (i == lvq.end()){
messages.push_back(qm);
- listeners.swap(copy);
+ listeners.populate(copy);
lvq[key] = msg;
}else {
i->second->setReplacementMessage(msg,this);
}
}else {
messages.push_back(qm);
- listeners.swap(copy);
+ listeners.populate(copy);
}
}
- for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
+ copy.notify();
}
QueuedMessage Queue::getFront()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=721685&r1=721684&r2=721685&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Sat Nov 29 08:01:54
2008
@@ -27,6 +27,7 @@
#include "PersistableQueue.h"
#include "QueuePolicy.h"
#include "QueueBindings.h"
+#include "QueueListeners.h"
#include "RateTracker.h"
#include "qpid/framing/FieldTable.h"
@@ -64,7 +65,6 @@
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
- typedef std::list<Consumer::shared_ptr> Listeners;
typedef std::deque<QueuedMessage> Messages;
typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
@@ -80,7 +80,7 @@
bool inLastNodeFailure;
std::string traceId;
std::vector<std::string> traceExclude;
- Listeners listeners;
+ QueueListeners listeners;
Messages messages;
LVQ lvq;
mutable qpid::sys::Mutex consumerLock;
@@ -94,7 +94,7 @@
boost::shared_ptr<Exchange> alternateExchange;
framing::SequenceNumber sequence;
qmf::org::apache::qpid::broker::Queue* mgmtObject;
-RateTracker dequeueTracker;
+ RateTracker dequeueTracker;
void push(boost::intrusive_ptr<Message>& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -104,7 +104,6 @@
bool browseNextMessage(QueuedMessage& msg, Consumer::shared_ptr c);
void removeListener(Consumer::shared_ptr);
- void addListener(Consumer::shared_ptr);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp?rev=721685&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp Sat Nov 29
08:01:54 2008
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "QueueListeners.h"
+#include <boost/bind.hpp>
+
+namespace qpid {
+namespace broker {
+
+void QueueListeners::addListener(Consumer::shared_ptr c)
+{
+ if (c->preAcquires()) {
+ add(consumers, c);
+ } else {
+ add(browsers, c);
+ }
+}
+
+void QueueListeners::removeListener(Consumer::shared_ptr c)
+{
+ if (c->preAcquires()) {
+ remove(consumers, c);
+ } else {
+ remove(browsers, c);
+ }
+}
+
+void QueueListeners::populate(NotificationSet& set)
+{
+ if (consumers.size()) {
+ set.consumer = consumers.front();
+ consumers.pop_front();
+ } else {
+ browsers.swap(set.browsers);
+ }
+}
+
+void QueueListeners::add(Listeners& listeners, Consumer::shared_ptr c)
+{
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+ if (i == listeners.end()) listeners.push_back(c);
+}
+
+void QueueListeners::remove(Listeners& listeners, Consumer::shared_ptr c)
+{
+ Listeners::iterator i = std::find(listeners.begin(), listeners.end(), c);
+ if (i != listeners.end()) listeners.erase(i);
+}
+
+void QueueListeners::NotificationSet::notify()
+{
+ if (consumer) consumer->notify();
+ else for_each(browsers.begin(), browsers.end(),
boost::mem_fn(&Consumer::notify));
+}
+
+}} // namespace qpid::broker
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h?rev=721685&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h Sat Nov 29
08:01:54 2008
@@ -0,0 +1,68 @@
+#ifndef QPID_BROKER_QUEUELISTENERS_H
+#define QPID_BROKER_QUEUELISTENERS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Consumer.h"
+#include <list>
+
+namespace qpid {
+namespace broker {
+
+/**
+ * Track and notify components that wish to be notified of messages
+ * that become available on a queue.
+ *
+ * None of the methods defined here are protected by locking. However
+ * the populate method allows a 'snapshot' to be taken of the
+ * listeners to be notified. NotificationSet::notify() may then be
+ * called outside of any lock that protects the QueueListeners
+ * instance from concurrent access.
+ */
+class QueueListeners
+{
+ public:
+ typedef std::list<Consumer::shared_ptr> Listeners;
+
+ class NotificationSet
+ {
+ public:
+ void notify();
+ private:
+ Listeners browsers;
+ Consumer::shared_ptr consumer;
+ friend class QueueListeners;
+ };
+
+ void addListener(Consumer::shared_ptr);
+ void removeListener(Consumer::shared_ptr);
+ void populate(NotificationSet&);
+ private:
+ Listeners consumers;
+ Listeners browsers;
+
+ void add(Listeners&, Consumer::shared_ptr);
+ void remove(Listeners&, Consumer::shared_ptr);
+
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_QUEUELISTENERS_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueListeners.h
------------------------------------------------------------------------------
svn:keywords = Rev Date