Author: gsim
Date: Wed Apr 9 10:59:38 2008
New Revision: 646452
URL: http://svn.apache.org/viewvc?rev=646452&view=rev
Log:
Handle the set-redelivered flag on the final version of the message.release
command.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Wed Apr 9
10:59:38 2008
@@ -111,9 +111,10 @@
}
}
-void DeliveryRecord::release()
+void DeliveryRecord::release(bool setRedelivered)
{
if (acquired && !ended) {
+ if (setRedelivered) msg.payload->redeliver();
queue->requeue(msg);
acquired = false;
setEnded();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Wed Apr 9
10:59:38 2008
@@ -67,7 +67,7 @@
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
- void release();
+ void release(bool setRedelivered);
void reject();
void cancel(const std::string& tag);
void redeliver(SemanticState* const);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Wed
Apr 9 10:59:38 2008
@@ -39,7 +39,7 @@
MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerImpl(s),
- releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
{}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Apr 9
10:59:38 2008
@@ -625,14 +625,14 @@
for_each(range.start, range.end, AcquireFunctor(acquired));
}
-void SemanticState::release(DeliveryId first, DeliveryId last)
+void SemanticState::release(DeliveryId first, DeliveryId last, bool
setRedelivered)
{
AckRange range = findRange(first, last);
//release results in the message being added to the head so want
//to release in reverse order to keep the original transfer order
DeliveryRecords::reverse_iterator start(range.end);
DeliveryRecords::reverse_iterator end(range.start);
- for_each(start, end, mem_fun_ref(&DeliveryRecord::release));
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::release),
setRedelivered));
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Apr 9
10:59:38 2008
@@ -178,7 +178,7 @@
void flow(bool active);
DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
- void release(DeliveryId first, DeliveryId last);
+ void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
bool doOutput() { return outputTasks.doOutput(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Apr 9
10:59:38 2008
@@ -298,7 +298,8 @@
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
- releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2,
true)),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
{}
@@ -314,9 +315,9 @@
//not yet used (content containing assemblies treated differently at
present
}
- void SessionAdapter::MessageHandlerImpl::release(const SequenceSet&
transfers, bool /*setRedelivered*/)
+void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers,
bool setRedelivered)
{
- transfers.for_each(releaseOp);
+ transfers.for_each(setRedelivered ? releaseRedeliveredOp : releaseOp);
}
void
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=646452&r1=646451&r2=646452&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Wed Apr 9
10:59:38 2008
@@ -149,6 +149,7 @@
public HandlerHelper
{
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+ RangedOperation releaseRedeliveredOp;
RangedOperation releaseOp;
RangedOperation rejectOp;
RangedOperation acceptOp;