Author: gsim
Date: Fri Oct 12 05:08:40 2007
New Revision: 584144
URL: http://svn.apache.org/viewvc?rev=584144&view=rev
Log:
Some fixes to locking within the queue (preventing locks being held during
delivery to a consumer)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=584144&r1=584143&r2=584144&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 12 05:08:40
2007
@@ -139,32 +139,32 @@
}
}
-bool Queue::dispatch(QueuedMessage& msg){
-
-
- RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
+Consumer* Queue::allocate()
+{
+ RWlock::ScopedWlock locker(consumerLock);
if(acquirers.empty()){
- return false;
+ return 0;
}else if(exclusive){
- return exclusive->deliver(msg);
+ return exclusive;
}else{
- //deliver to next consumer
next = next % acquirers.size();
- Consumer* c = acquirers[next];
- int start = next;
- while(c){
- next++;
- if(c->deliver(msg)) {
- return true;
- }
- next = next % acquirers.size();
- c = next == start ? 0 : acquirers[next];
- }
- return false;
+ return acquirers[next++];
}
}
+bool Queue::dispatch(QueuedMessage& msg)
+{
+ Consumer* c = allocate();
+ int start = next;
+ while(c){
+ if(c->deliver(msg)) {
+ return true;
+ }
+ c = next == start ? 0 : allocate();
+ }
+ return false;
+}
void Queue::dispatch(){
QueuedMessage msg;
@@ -188,27 +188,22 @@
void Queue::serviceBrowser(Consumer* browser)
{
- //This is a poorly performing implementation:
- //
- // * bad concurrency where browsers exist
- // * inefficient for largish queues
- //
- //The queue needs to be based on a current data structure that
- //does not invalidate iterators when modified. Subscribers could
- //then use an iterator to continue from where they left off
+ QueuedMessage msg;
+ while (seek(msg, browser->position) && browser->deliver(msg)) {
+ browser->position = msg.position;
+ }
+}
+bool Queue::seek(QueuedMessage& msg, const framing::SequenceNumber& position) {
Mutex::ScopedLock locker(messageLock);
- if (!messages.empty() && messages.back().position > browser->position) {
- for (Messages::iterator i = messages.begin(); i != messages.end();
i++) {
- if (i->position > browser->position) {
- if (browser->deliver(*i)) {
- browser->position = i->position;
- } else {
- break;
- }
- }
- }
+ if (!messages.empty() && messages.back().position > position) {
+ uint index = (position - messages.front().position) + 1;
+ if (index < messages.size()) {
+ msg = messages[index];
+ return true;
+ }
}
+ return false;
}
void Queue::consume(Consumer* c, bool requestExclusive){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=584144&r1=584143&r2=584144&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Oct 12 05:08:40
2007
@@ -94,6 +94,8 @@
void dispatch();
void cancel(Consumer* c, Consumers& set);
void serviceBrowser(Consumer* c);
+ Consumer* allocate();
+ bool seek(QueuedMessage& msg, const framing::SequenceNumber&
position);
protected:
/**