Author: gsim
Date: Tue Jul 3 07:54:19 2007
New Revision: 552862
URL: http://svn.apache.org/viewvc?view=rev&rev=552862
Log:
Fix (and test) for QPID-407. Messages are requeued at the head rather than the
tail.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Tue Jul 3
07:54:19 2007
@@ -330,9 +330,9 @@
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with
possible concurrent delivery
ack_iterator i = find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
- ack_iterator j = (firstTag == 0) ?
- unacked.begin() :
- find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
+ ack_iterator j = (firstTag == 0) ?
+ unacked.begin() :
+ find_if(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag));
if(i == unacked.end()){
throw ConnectionException(530, "Received ack for unrecognised
delivery tag");
@@ -364,7 +364,7 @@
outstanding.reset();
std::list<DeliveryRecord> copy = unacked;
unacked.clear();
- for_each(copy.begin(), copy.end(),
mem_fun_ref(&DeliveryRecord::requeue));
+ for_each(copy.rbegin(), copy.rend(),
mem_fun_ref(&DeliveryRecord::requeue));
}else{
for_each(unacked.begin(), unacked.end(),
bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Jul 3
07:54:19 2007
@@ -75,6 +75,14 @@
}
}
+void Queue::requeue(Message::shared_ptr& msg){
+ Mutex::ScopedLock locker(lock);
+ if(queueing || !dispatch(msg)){
+ queueing = true;
+ messages.push_front(msg);
+ }
+}
+
bool Queue::dispatch(Message::shared_ptr& msg){
if(consumers.empty()){
return false;
@@ -163,12 +171,12 @@
void Queue::pop(){
if (policy.get()) policy->dequeued(messages.front()->contentSize());
- messages.pop();
+ messages.pop_front();
}
void Queue::push(Message::shared_ptr& msg){
queueing = true;
- messages.push(msg);
+ messages.push_back(msg);
if (policy.get()) {
policy->enqueued(msg->contentSize());
if (policy->limitExceeded()) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Jul 3
07:54:19 2007
@@ -23,7 +23,7 @@
*/
#include <vector>
#include <memory>
-#include <queue>
+#include <deque>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_types.h"
#include "ConnectionToken.h"
@@ -57,7 +57,7 @@
*/
class Queue : public PersistableQueue{
typedef std::vector<Consumer*> Consumers;
- typedef std::queue<Message::shared_ptr> Messages;
+ typedef std::deque<Message::shared_ptr> Messages;
const string name;
const bool autodelete;
@@ -108,6 +108,13 @@
* one is available or stores it for later if not.
*/
void process(Message::shared_ptr& msg);
+ /**
+ * Returns a message to the in-memory queue (due to lack
+ * of acknowledegement from a receiver). If a consumer is
+ * available it will be dispatched immediately, else it
+ * will be returned to the front of the queue.
+ */
+ void requeue(Message::shared_ptr& msg);
/**
* Used during recovery to add stored messages back to the queue
*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?view=diff&rev=552862&r1=552861&r2=552862
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Tue Jul 3
07:54:19 2007
@@ -65,7 +65,7 @@
void DeliveryRecord::requeue() const{
msg->redeliver();
- queue->process(msg);
+ queue->requeue(msg);
}
void DeliveryRecord::addTo(Prefetch* const prefetch) const{