Author: cctrieloff
Date: Tue Oct 14 08:23:51 2008
New Revision: 704565

URL: http://svn.apache.org/viewvc?rev=704565&view=rev
Log:

Correction of lock scope to make sure ordering from exchange to queue is
preseved when sequencing is used.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Tue Oct 14 
08:23:51 2008
@@ -70,7 +70,7 @@
 }
 
 void DirectExchange::route(Deliverable& msg, const string& routingKey, const 
FieldTable* /*args*/){
-    preRoute(msg);
+    PreRoute pr(msg, this);
     Queues::ConstPtr p;
     {
         Mutex::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.cpp Tue Oct 14 
08:23:51 2008
@@ -41,6 +41,20 @@
 }
 
 
+Exchange::PreRoute::PreRoute(Deliverable& msg, Exchange* _p):parent(_p) {
+    if (parent && parent->sequence){
+        parent->sequenceLock.lock();
+        parent->sequenceNo++;
+        
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,parent->sequenceNo);
 
+    }
+}
+
+Exchange::PreRoute::~PreRoute(){
+    if (parent && parent->sequence){
+        parent->sequenceLock.unlock();
+    }
+}
+
 Exchange::Exchange (const string& _name, Manageable* parent) :
     name(_name), durable(false), persistenceId(0), sequence(false), 
        sequenceNo(0), mgmtExchange(0)
@@ -89,13 +103,6 @@
         mgmtExchange->resourceDestroy ();
 }
 
-void Exchange::preRoute(Deliverable& msg){
-       if (sequence){
-        sys::Mutex::ScopedLock lock(sequenceLock);
-               
msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
 
-       }
-}
-
 void Exchange::setPersistenceId(uint64_t id) const
 {
     if (mgmtExchange != 0 && persistenceId == 0)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Exchange.h Tue Oct 14 
08:23:51 2008
@@ -46,14 +46,20 @@
             boost::shared_ptr<Exchange> alternate;
             uint32_t alternateUsers;
             mutable uint64_t persistenceId;
+
+        protected:
             bool sequence;
             mutable qpid::sys::Mutex sequenceLock;
             uint64_t sequenceNo;
-
-        protected:
-               
-            void preRoute(Deliverable& msg);
                
+            class PreRoute{
+            public:
+                PreRoute(Deliverable& msg, Exchange* _p);
+                ~PreRoute();
+            private:
+                Exchange* parent;
+                   };
+           
             struct Binding : public management::Manageable {
                 typedef boost::shared_ptr<Binding>       shared_ptr;
                 typedef std::vector<Binding::shared_ptr> vector;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Tue Oct 14 
08:23:51 2008
@@ -69,7 +69,7 @@
 }
 
 void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, 
const FieldTable* /*args*/){
-    preRoute(msg);
+    PreRoute pr(msg, this);
     uint32_t count(0);
 
     BindingsArray::ConstPtr p = bindings.snapshot();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Oct 
14 08:23:51 2008
@@ -105,7 +105,7 @@
 
 void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, 
const FieldTable* args){
     if (!args) return;//can't match if there were no headers passed in
-    preRoute(msg);
+    PreRoute pr(msg, this);
 
     uint32_t count(0);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?rev=704565&r1=704564&r2=704565&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Tue Oct 14 
08:23:51 2008
@@ -180,7 +180,7 @@
 
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const 
FieldTable* /*args*/){
     RWlock::ScopedRlock l(lock);
-    preRoute(msg);
+    PreRoute pr(msg, this);
     uint32_t count(0);
     Tokens   tokens(routingKey);
 


Reply via email to