Author: cctrieloff
Date: Tue Oct 14 08:23:51 2008
New Revision: 704565
URL: http://svn.apache.org/viewvc?rev=704565&view=rev
Log:
Correction of lock scope to make sure ordering from exchange to queue is
preseved when sequencing is used.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Tue Oct 14
08:23:51 2008
@@ -70,7 +70,7 @@
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const
FieldTable* /*args*/){
- preRoute(msg);
+ PreRoute pr(msg, this);
Queues::ConstPtr p;
{
Mutex::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Oct 14
08:23:51 2008
@@ -41,6 +41,20 @@
}
+Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
+ if (parent && parent->sequence){
+ parent->sequenceLock.lock();
+ parent->sequenceNo++;
+
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
+ }
+}
+
+Exchange::PreRoute::~PreRoute(){
+ if (parent && parent->sequence){
+ parent->sequenceLock.unlock();
+ }
+}
+
Exchange::Exchange (const string& _name, Manageable* parent) :
name(_name), durable(false), persistenceId(0), sequence(false),
sequenceNo(0), mgmtExchange(0)
@@ -89,13 +103,6 @@
mgmtExchange->resourceDestroy ();
}
-void Exchange::preRoute(Deliverable& msg){
- if (sequence){
- sys::Mutex::ScopedLock lock(sequenceLock);
-
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
- }
-}
-
void Exchange::setPersistenceId(uint64_t id) const
{
if (mgmtExchange != 0 && persistenceId == 0)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Tue Oct 14
08:23:51 2008
@@ -46,14 +46,20 @@
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+
+ protected:
bool sequence;
mutable qpid::sys::Mutex sequenceLock;
uint64_t sequenceNo;
-
- protected:
-
- void preRoute(Deliverable& msg);
+ class PreRoute{
+ public:
+ PreRoute(Deliverable& msg, Exchange* _p);
+ ~PreRoute();
+ private:
+ Exchange* parent;
+ };
+
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Tue Oct 14
08:23:51 2008
@@ -69,7 +69,7 @@
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/,
const FieldTable* /*args*/){
- preRoute(msg);
+ PreRoute pr(msg, this);
uint32_t count(0);
BindingsArray::ConstPtr p = bindings.snapshot();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Oct
14 08:23:51 2008
@@ -105,7 +105,7 @@
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/,
const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
- preRoute(msg);
+ PreRoute pr(msg, this);
uint32_t count(0);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Tue Oct 14
08:23:51 2008
@@ -180,7 +180,7 @@
void TopicExchange::route(Deliverable& msg, const string& routingKey, const
FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
- preRoute(msg);
+ PreRoute pr(msg, this);
uint32_t count(0);
Tokens tokens(routingKey);