Author: cctrieloff
Date: Tue Jul 17 13:46:45 2007
New Revision: 557052

URL: http://svn.apache.org/viewvc?view=rev&rev=557052
Log:
Updated queue class, can run dispatch on seperate thread or on 
thread servicing the request. current set to use a worker - better
test results.

controlled by setting serilizable true - no worker, false, use a worker


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=557052&r1=557051&r2=557052
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Jul 17 
13:46:45 2007
@@ -29,8 +29,10 @@
 #include "qpid/sys/Monitor.h"
 #include "qpid/sys/Time.h"
 #include <iostream>
+#include <boost/bind.hpp>
 #include "QueueRegistry.h"
 
+
 using namespace qpid::broker;
 using namespace qpid::sys;
 using namespace qpid::framing;
@@ -44,11 +46,10 @@
     autodelete(_autodelete),
     store(_store),
     owner(_owner), 
-    queueing(false),
-    dispatching(false),
     next(0),
     exclusive(0),
-    persistenceId(0)
+    persistenceId(0),
+    serializer(false)
 {
 }
 
@@ -69,21 +70,28 @@
 }
 
 void Queue::process(Message::shared_ptr& msg){
-    RWlock::ScopedWlock locker(messageLock);
-    if(queueing || !dispatch(msg)){
-        push(msg);
-    }
+ 
+    push(msg);
+    serializer.execute(boost::bind(&Queue::dispatch, this));
+   
 }
 
 void Queue::requeue(Message::shared_ptr& msg){
-    RWlock::ScopedWlock locker(messageLock);
-    if(queueing || !dispatch(msg)){
-        queueing = true;
-        messages.push_front(msg);
+ 
+    {
+       Mutex::ScopedLock locker(messageLock);
+       messages.push_front(msg);
     }
+    serializer.execute(boost::bind(&Queue::dispatch, this));
+   
 }
 
+
 bool Queue::dispatch(Message::shared_ptr& msg){
+
+ 
+    RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
+ 
     if(consumers.empty()){
         return false;
     }else if(exclusive){
@@ -96,7 +104,6 @@
         while(c){
             next++;
             if(c->deliver(msg)) return true;            
-
             next = next % consumers.size();
             c = next == start ? 0 : consumers[next];            
         }
@@ -104,28 +111,22 @@
     }
 }
 
-bool Queue::startDispatching(){
-    RWlock::ScopedRlock locker(messageLock);
-    if(queueing && !dispatching){
-        dispatching = true;
-        return true;
-    }else{
-        return false;
-    }
-}
 
 void Queue::dispatch(){
-    bool proceed = startDispatching();
-    while(proceed){
-        RWlock::ScopedWlock locker(messageLock);
-        if(!messages.empty() && dispatch(messages.front())){
+
+     Message::shared_ptr msg;
+     while(true){
+        {
+           Mutex::ScopedLock locker(messageLock);
+           if (messages.empty()) break; 
+           msg = messages.front();
+       }
+        if( dispatch(msg) ){
             pop();
-        }else{
-            dispatching = false;
-            proceed = false;
-            queueing = !messages.empty();
-        }
+        }else break;
+       
     }
+    
 }
 
 void Queue::consume(Consumer* c, bool requestExclusive){
@@ -153,7 +154,7 @@
 }
 
 Message::shared_ptr Queue::dequeue(){
-    RWlock::ScopedWlock locker(messageLock);
+    Mutex::ScopedLock locker(messageLock);
     Message::shared_ptr msg;
     if(!messages.empty()){
         msg = messages.front();
@@ -163,19 +164,20 @@
 }
 
 uint32_t Queue::purge(){
-    RWlock::ScopedWlock locker(messageLock);
+    Mutex::ScopedLock locker(messageLock);
     int count = messages.size();
     while(!messages.empty()) pop();
     return count;
 }
 
 void Queue::pop(){
+    Mutex::ScopedLock locker(messageLock);
     if (policy.get()) policy->dequeued(messages.front()->contentSize());
     messages.pop_front();    
 }
 
 void Queue::push(Message::shared_ptr& msg){
-    queueing = true;
+    Mutex::ScopedLock locker(messageLock);
     messages.push_back(msg);
     if (policy.get()) {
         policy->enqueued(msg->contentSize());
@@ -186,7 +188,7 @@
 }
 
 uint32_t Queue::getMessageCount() const{
-    RWlock::ScopedRlock locker(messageLock);
+    Mutex::ScopedLock locker(messageLock);
     return messages.size();
 }
 
@@ -241,7 +243,7 @@
 void Queue::destroy()
 {
     if (alternateExchange.get()) {
-        RWlock::ScopedWlock locker(messageLock);
+        Mutex::ScopedLock locker(messageLock);
         while(!messages.empty()){
             DeliverableMessage msg(messages.front());
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=557052&r1=557051&r2=557052
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Jul 17 
13:46:45 2007
@@ -30,6 +30,7 @@
 #include "Consumer.h"
 #include "BrokerMessage.h"
 #include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Serializer.h"
 #include "qpid/sys/Monitor.h"
 #include "PersistableQueue.h"
 #include "QueuePolicy.h"
@@ -65,21 +66,19 @@
             const ConnectionToken* const owner;
             Consumers consumers;
             Messages messages;
-            bool queueing;
-            bool dispatching;
             int next;
             mutable qpid::sys::RWlock consumerLock;
-            mutable qpid::sys::RWlock messageLock;
+            mutable qpid::sys::Mutex messageLock;
             Consumer* exclusive;
             mutable uint64_t persistenceId;
             framing::FieldTable settings;
             std::auto_ptr<QueuePolicy> policy;            
             QueueBindings bindings;
             boost::shared_ptr<Exchange> alternateExchange;
+           qpid::sys::Serializer serializer;
 
             void pop();
             void push(Message::shared_ptr& msg);
-            bool startDispatching();
             bool dispatch(Message::shared_ptr& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?view=diff&rev=557052&r1=557051&r2=557052
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Jul 17 13:46:45 
2007
@@ -84,12 +84,16 @@
         Message::shared_ptr msg3 = message("e", "C");
 
         queue->deliver(msg1);
-        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+       /** if dispatched on diff thread, force dispatch so don't have to wait 
for thread. Only do in text */
+        queue->dispatch();  
+       CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
 
         queue->deliver(msg2);
+        queue->dispatch();
         CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
         
         queue->deliver(msg3);
+        queue->dispatch();
         CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());        
     
         //Test cancellation:


Reply via email to