Author: gsim
Date: Tue Jul  3 07:54:19 2007
New Revision: 552862

URL: http://svn.apache.org/viewvc?view=rev&rev=552862
Log:
Fix (and test) for QPID-407. Messages are requeued at the head rather than the 
tail.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Tue Jul  3 
07:54:19 2007
@@ -330,9 +330,9 @@
         Mutex::ScopedLock locker(deliveryLock);//need to synchronize with 
possible concurrent delivery
     
         ack_iterator i = find_if(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
-               ack_iterator j = (firstTag == 0) ?
-                       unacked.begin() :
-               find_if(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+        ack_iterator j = (firstTag == 0) ?
+            unacked.begin() :
+            find_if(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
                
         if(i == unacked.end()){
             throw ConnectionException(530, "Received ack for unrecognised 
delivery tag");
@@ -364,7 +364,7 @@
         outstanding.reset();
         std::list<DeliveryRecord> copy = unacked;
         unacked.clear();
-        for_each(copy.begin(), copy.end(), 
mem_fun_ref(&DeliveryRecord::requeue));
+        for_each(copy.rbegin(), copy.rend(), 
mem_fun_ref(&DeliveryRecord::requeue));
     }else{
         for_each(unacked.begin(), unacked.end(), 
bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));        
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Jul  3 
07:54:19 2007
@@ -75,6 +75,14 @@
     }
 }
 
+void Queue::requeue(Message::shared_ptr& msg){
+    Mutex::ScopedLock locker(lock);
+    if(queueing || !dispatch(msg)){
+        queueing = true;
+        messages.push_front(msg);
+    }
+}
+
 bool Queue::dispatch(Message::shared_ptr& msg){
     if(consumers.empty()){
         return false;
@@ -163,12 +171,12 @@
 
 void Queue::pop(){
     if (policy.get()) policy->dequeued(messages.front()->contentSize());
-    messages.pop();    
+    messages.pop_front();    
 }
 
 void Queue::push(Message::shared_ptr& msg){
     queueing = true;
-    messages.push(msg);
+    messages.push_back(msg);
     if (policy.get()) {
         policy->enqueued(msg->contentSize());
         if (policy->limitExceeded()) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Jul  3 
07:54:19 2007
@@ -23,7 +23,7 @@
  */
 #include <vector>
 #include <memory>
-#include <queue>
+#include <deque>
 #include <boost/shared_ptr.hpp>
 #include "qpid/framing/amqp_types.h"
 #include "ConnectionToken.h"
@@ -57,7 +57,7 @@
          */
         class Queue : public PersistableQueue{
             typedef std::vector<Consumer*> Consumers;
-            typedef std::queue<Message::shared_ptr> Messages;
+            typedef std::deque<Message::shared_ptr> Messages;
             
             const string name;
             const bool autodelete;
@@ -108,6 +108,13 @@
              * one is available or stores it for later if not.
              */
             void process(Message::shared_ptr& msg);
+            /**
+             * Returns a message to the in-memory queue (due to lack
+             * of acknowledegement from a receiver). If a consumer is
+             * available it will be dispatched immediately, else it
+             * will be returned to the front of the queue.
+             */
+            void requeue(Message::shared_ptr& msg);
             /**
              * Used during recovery to add stored messages back to the queue
              */

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Jul  3 
07:54:19 2007
@@ -65,7 +65,7 @@
 
 void DeliveryRecord::requeue() const{
     msg->redeliver();
-    queue->process(msg);
+    queue->requeue(msg);
 }
 
 void DeliveryRecord::addTo(Prefetch* const prefetch) const{


Reply via email to