Author: cctrieloff
Date: Tue Jul 17 13:46:45 2007
New Revision: 557052
URL: http://svn.apache.org/viewvc?view=rev&rev=557052
Log:
Updated queue class, can run dispatch on seperate thread or on
thread servicing the request. current set to use a worker - better
test results.
controlled by setting serilizable true - no worker, false, use a worker
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=557052&r1=557051&r2=557052
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Jul 17
13:46:45 2007
@@ -29,8 +29,10 @@
#include "qpid/sys/Monitor.h"
#include "qpid/sys/Time.h"
#include <iostream>
+#include <boost/bind.hpp>
#include "QueueRegistry.h"
+
using namespace qpid::broker;
using namespace qpid::sys;
using namespace qpid::framing;
@@ -44,11 +46,10 @@
autodelete(_autodelete),
store(_store),
owner(_owner),
- queueing(false),
- dispatching(false),
next(0),
exclusive(0),
- persistenceId(0)
+ persistenceId(0),
+ serializer(false)
{
}
@@ -69,21 +70,28 @@
}
void Queue::process(Message::shared_ptr& msg){
- RWlock::ScopedWlock locker(messageLock);
- if(queueing || !dispatch(msg)){
- push(msg);
- }
+
+ push(msg);
+ serializer.execute(boost::bind(&Queue::dispatch, this));
+
}
void Queue::requeue(Message::shared_ptr& msg){
- RWlock::ScopedWlock locker(messageLock);
- if(queueing || !dispatch(msg)){
- queueing = true;
- messages.push_front(msg);
+
+ {
+ Mutex::ScopedLock locker(messageLock);
+ messages.push_front(msg);
}
+ serializer.execute(boost::bind(&Queue::dispatch, this));
+
}
+
bool Queue::dispatch(Message::shared_ptr& msg){
+
+
+ RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
+
if(consumers.empty()){
return false;
}else if(exclusive){
@@ -96,7 +104,6 @@
while(c){
next++;
if(c->deliver(msg)) return true;
-
next = next % consumers.size();
c = next == start ? 0 : consumers[next];
}
@@ -104,28 +111,22 @@
}
}
-bool Queue::startDispatching(){
- RWlock::ScopedRlock locker(messageLock);
- if(queueing && !dispatching){
- dispatching = true;
- return true;
- }else{
- return false;
- }
-}
void Queue::dispatch(){
- bool proceed = startDispatching();
- while(proceed){
- RWlock::ScopedWlock locker(messageLock);
- if(!messages.empty() && dispatch(messages.front())){
+
+ Message::shared_ptr msg;
+ while(true){
+ {
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) break;
+ msg = messages.front();
+ }
+ if( dispatch(msg) ){
pop();
- }else{
- dispatching = false;
- proceed = false;
- queueing = !messages.empty();
- }
+ }else break;
+
}
+
}
void Queue::consume(Consumer* c, bool requestExclusive){
@@ -153,7 +154,7 @@
}
Message::shared_ptr Queue::dequeue(){
- RWlock::ScopedWlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
Message::shared_ptr msg;
if(!messages.empty()){
msg = messages.front();
@@ -163,19 +164,20 @@
}
uint32_t Queue::purge(){
- RWlock::ScopedWlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
int count = messages.size();
while(!messages.empty()) pop();
return count;
}
void Queue::pop(){
+ Mutex::ScopedLock locker(messageLock);
if (policy.get()) policy->dequeued(messages.front()->contentSize());
messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
- queueing = true;
+ Mutex::ScopedLock locker(messageLock);
messages.push_back(msg);
if (policy.get()) {
policy->enqueued(msg->contentSize());
@@ -186,7 +188,7 @@
}
uint32_t Queue::getMessageCount() const{
- RWlock::ScopedRlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
return messages.size();
}
@@ -241,7 +243,7 @@
void Queue::destroy()
{
if (alternateExchange.get()) {
- RWlock::ScopedWlock locker(messageLock);
+ Mutex::ScopedLock locker(messageLock);
while(!messages.empty()){
DeliverableMessage msg(messages.front());
alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=557052&r1=557051&r2=557052
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Jul 17
13:46:45 2007
@@ -30,6 +30,7 @@
#include "Consumer.h"
#include "BrokerMessage.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Serializer.h"
#include "qpid/sys/Monitor.h"
#include "PersistableQueue.h"
#include "QueuePolicy.h"
@@ -65,21 +66,19 @@
const ConnectionToken* const owner;
Consumers consumers;
Messages messages;
- bool queueing;
- bool dispatching;
int next;
mutable qpid::sys::RWlock consumerLock;
- mutable qpid::sys::RWlock messageLock;
+ mutable qpid::sys::Mutex messageLock;
Consumer* exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
boost::shared_ptr<Exchange> alternateExchange;
+ qpid::sys::Serializer serializer;
void pop();
void push(Message::shared_ptr& msg);
- bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?view=diff&rev=557052&r1=557051&r2=557052
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Jul 17 13:46:45
2007
@@ -84,12 +84,16 @@
Message::shared_ptr msg3 = message("e", "C");
queue->deliver(msg1);
- CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+ /** if dispatched on diff thread, force dispatch so don't have to wait
for thread. Only do in text */
+ queue->dispatch();
+ CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
queue->deliver(msg2);
+ queue->dispatch();
CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
queue->deliver(msg3);
+ queue->dispatch();
CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());
//Test cancellation: