Author: gsim
Date: Wed Apr  9 10:59:38 2008
New Revision: 646452

URL: http://svn.apache.org/viewvc?rev=646452&view=rev
Log:
Handle the set-redelivered flag on the final version of the message.release 
command.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Apr  9 
10:59:38 2008
@@ -111,9 +111,10 @@
     }
 }
 
-void DeliveryRecord::release() 
+void DeliveryRecord::release(bool setRedelivered) 
 {
     if (acquired && !ended) {
+        if (setRedelivered) msg.payload->redeliver();
         queue->requeue(msg);
         acquired = false;
         setEnded();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed Apr  9 
10:59:38 2008
@@ -67,7 +67,7 @@
 
     void dequeue(TransactionContext* ctxt = 0) const;
     void requeue() const;
-    void release();
+    void release(bool setRedelivered);
     void reject();
     void cancel(const std::string& tag);
     void redeliver(SemanticState* const);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Wed 
Apr  9 10:59:38 2008
@@ -39,7 +39,7 @@
 
 MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
     HandlerImpl(s),
-    releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+    releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
     rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
  {}
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Apr  9 
10:59:38 2008
@@ -625,14 +625,14 @@
     for_each(range.start, range.end, AcquireFunctor(acquired));
 }
 
-void SemanticState::release(DeliveryId first, DeliveryId last)
+void SemanticState::release(DeliveryId first, DeliveryId last, bool 
setRedelivered)
 {
     AckRange range = findRange(first, last);
     //release results in the message being added to the head so want
     //to release in reverse order to keep the original transfer order
     DeliveryRecords::reverse_iterator start(range.end);
     DeliveryRecords::reverse_iterator end(range.start);
-    for_each(start, end, mem_fun_ref(&DeliveryRecord::release));
+    for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::release), 
setRedelivered));
 }
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Apr  9 
10:59:38 2008
@@ -178,7 +178,7 @@
     void flow(bool active);
     DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); 
           
     void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
-    void release(DeliveryId first, DeliveryId last);
+    void release(DeliveryId first, DeliveryId last, bool setRedelivered);
     void reject(DeliveryId first, DeliveryId last);
     void handle(boost::intrusive_ptr<Message> msg);
     bool doOutput() { return outputTasks.doOutput(); }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Apr  9 
10:59:38 2008
@@ -298,7 +298,8 @@
 
 SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : 
     HandlerHelper(s),
-    releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+    releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, 
true)),
+    releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
     rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
     acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
  {}
@@ -314,9 +315,9 @@
     //not yet used (content containing assemblies treated differently at 
present
 }
 
-    void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& 
transfers, bool /*setRedelivered*/)
+void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, 
bool setRedelivered)
 {
-    transfers.for_each(releaseOp);
+    transfers.for_each(setRedelivered ? releaseRedeliveredOp : releaseOp);
 }
 
 void

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Wed Apr  9 
10:59:38 2008
@@ -149,6 +149,7 @@
         public HandlerHelper
     {
         typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; 
   
+        RangedOperation releaseRedeliveredOp;
         RangedOperation releaseOp;
         RangedOperation rejectOp;
         RangedOperation acceptOp;


Reply via email to