Author: gsim
Date: Fri Oct 5 10:28:48 2007
New Revision: 582353
URL: http://svn.apache.org/viewvc?rev=582353&view=rev
Log:
Don't recover messages for cancelled subscriptions.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Oct 5
10:28:48 2007
@@ -39,18 +39,21 @@
id(_id),
acquired(_acquired),
confirmed(_confirmed),
- pull(false)
+ pull(false),
+
cancelled(false)
{
}
DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg,
Queue::shared_ptr _queue,
const DeliveryId _id) : msg(_msg),
- queue(_queue),
- id(_id),
- acquired(true),
-
confirmed(false),
- pull(true){}
+ queue(_queue),
+ id(_id),
+ acquired(true),
+ confirmed(false),
+ pull(true),
+ cancelled(false)
+{}
void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
@@ -76,7 +79,7 @@
}
void DeliveryRecord::redeliver(SemanticState* const session) {
- if (!confirmed) {
+ if (!confirmed && !cancelled) {
if(pull){
//if message was originally sent as response to get, we must
requeue it
requeue();
@@ -147,6 +150,12 @@
} else {
QPID_LOG(info, "Message already acquired " << id.getValue());
}
+}
+
+void DeliveryRecord::cancel(const std::string& cancelledTag)
+{
+ if (tag == cancelledTag)
+ cancelled = true;
}
namespace qpid {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Oct 5
10:28:48 2007
@@ -49,6 +49,7 @@
bool acquired;
const bool confirmed;
const bool pull;
+ bool cancelled;
public:
DeliveryRecord(const QueuedMessage& msg, Queue::shared_ptr queue, const
std::string tag, DeliveryToken::shared_ptr token,
@@ -63,6 +64,7 @@
void requeue() const;
void release();
void reject();
+ void cancel(const std::string& tag);
void redeliver(SemanticState* const);
void updateByteCredit(uint32_t& credit) const;
void addTo(Prefetch&) const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Fri Oct 5
10:28:48 2007
@@ -95,8 +95,14 @@
// consumers is a ptr_map so erase will delete the consumer
// which will call cancel.
ConsumerImplMap::iterator i = consumers.find(tag);
- if (i != consumers.end())
+ if (i != consumers.end()) {
consumers.erase(i);
+ //should cancel all unacked messages for this consumer so that
+ //they are not redelivered on recovery
+ Mutex::ScopedLock locker(deliveryLock);
+ for_each(unacked.begin(), unacked.end(),
boost::bind(mem_fun_ref(&DeliveryRecord::cancel), _1, tag));
+
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h?rev=582353&r1=582352&r2=582353&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Message.h Fri Oct 5 10:28:48
2007
@@ -75,6 +75,11 @@
return method;
}
+ const framing::SequenceNumber& getId() const
+ {
+ return id;
+ }
+
private:
//method and id are only set for received messages:
const framing::MessageTransferBody method;