Author: cctrieloff
Date: Thu Jul  5 09:19:05 2007
New Revision: 553549

URL: http://svn.apache.org/viewvc?view=rev&rev=553549
Log:

- Added RW lock
- Updated all exchanges to us RW lock
- Updated all registries to us RW lock
- Still need to do (client, channel, message and queues)



Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Jul  5 
09:19:05 2007
@@ -30,7 +30,7 @@
 DirectExchange::DirectExchange(const std::string& _name, bool _durable, const 
FieldTable& _args) : Exchange(_name, _durable, _args) {}
 
 bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable*){
-    Mutex::ScopedLock l(lock);
+    RWlock::ScopedWlock l(lock);
     std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
     std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), 
queues.end(), queue);
     if (i == queues.end()) {
@@ -42,7 +42,7 @@
 }
 
 bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/){
-    Mutex::ScopedLock l(lock);
+    RWlock::ScopedWlock l(lock);
     std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
 
     std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(), 
queues.end(), queue);
@@ -58,7 +58,7 @@
 }
 
 void DirectExchange::route(Deliverable& msg, const string& routingKey, const 
FieldTable* /*args*/){
-    Mutex::ScopedLock l(lock);
+    RWlock::ScopedRlock l(lock);
     std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
     int count(0);
     for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i != 
queues.end(); i++, count++){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu Jul  5 
09:19:05 2007
@@ -35,7 +35,7 @@
         typedef std::vector<Queue::shared_ptr> Queues;
         typedef std::map<string, Queues > Bindings;
         Bindings bindings;
-        qpid::sys::Mutex lock;
+        qpid::sys::RWlock lock;
 
     public:
         static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Jul  
5 09:19:05 2007
@@ -38,7 +38,7 @@
 pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name, 
const string& type, 
                                                            bool durable, const 
FieldTable& args) 
     throw(UnknownExchangeTypeException){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i == exchanges.end()) {
        Exchange::shared_ptr exchange;
@@ -62,7 +62,7 @@
 }
 
 void ExchangeRegistry::destroy(const string& name){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i != exchanges.end()) {
         exchanges.erase(i);
@@ -70,7 +70,7 @@
 }
 
 Exchange::shared_ptr ExchangeRegistry::get(const string& name){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedRlock locker(lock);
     ExchangeMap::iterator i =  exchanges.find(name);
     if (i == exchanges.end()) 
         throw ChannelException(404, "Exchange not found:" + name);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu Jul  5 
09:19:05 2007
@@ -35,7 +35,7 @@
     class ExchangeRegistry{
         typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
         ExchangeMap exchanges;
-        qpid::sys::Mutex lock;
+        qpid::sys::RWlock lock;
      public:
         std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, 
const std::string& type)
             throw(UnknownExchangeTypeException);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Jul  5 
09:19:05 2007
@@ -29,7 +29,7 @@
 FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const 
FieldTable& _args) : Exchange(_name, _durable, _args) {}
 
 bool FanOutExchange::bind(Queue::shared_ptr queue, const string& 
/*routingKey*/, const FieldTable* /*args*/){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     // Add if not already present.
     Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), 
queue);
     if (i == bindings.end()) {
@@ -41,7 +41,7 @@
 }
 
 bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& 
/*routingKey*/, const FieldTable* /*args*/){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(), 
queue);
     if (i != bindings.end()) {
         bindings.erase(i);
@@ -52,7 +52,7 @@
 }
 
 void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, 
const FieldTable* /*args*/){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedRlock locker(lock);
     for(Queue::vector::iterator i = bindings.begin(); i != bindings.end(); 
++i){
         msg.deliverTo(*i);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu Jul  5 
09:19:05 2007
@@ -34,7 +34,7 @@
 
 class FanOutExchange : public virtual Exchange {
     std::vector<Queue::shared_ptr> bindings;
-    qpid::sys::Mutex lock;
+    qpid::sys::RWlock lock;
 
   public:
     static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Jul  
5 09:19:05 2007
@@ -44,7 +44,7 @@
 HeadersExchange::HeadersExchange(const std::string& _name, bool _durable, 
const FieldTable& _args) : Exchange(_name, _durable, _args) {}
 
 bool HeadersExchange::bind(Queue::shared_ptr queue, const string& 
/*routingKey*/, const FieldTable* args){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     std::string what = args->getString("x-match");
     if (what != all && what != any) {
         THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to 
headers exchange.");
@@ -61,7 +61,7 @@
 }
 
 bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& 
/*routingKey*/, const FieldTable* args){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     Bindings::iterator i =
         std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
     if (i != bindings.end()) {
@@ -74,7 +74,7 @@
 
 
 void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, 
const FieldTable* args){
-    Mutex::ScopedLock locker(lock);;
+    RWlock::ScopedRlock locker(lock);;
     for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
         if (match(i->first, *args)) msg.deliverTo(i->second);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu Jul  5 
09:19:05 2007
@@ -37,7 +37,7 @@
     typedef std::vector<Binding> Bindings;
 
     Bindings bindings;
-    qpid::sys::Mutex lock;
+    qpid::sys::RWlock lock;
 
   public:
     static const std::string typeName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Thu Jul  5 
09:19:05 2007
@@ -33,7 +33,7 @@
 QueueRegistry::declare(const string& declareName, bool durable, 
                        bool autoDelete, const ConnectionToken* owner)
 {
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     string name = declareName.empty() ? generateName() : declareName;
     assert(!name.empty());
     QueueMap::iterator i =  queues.find(name);
@@ -47,12 +47,12 @@
 }
 
 void QueueRegistry::destroy(const string& name){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedWlock locker(lock);
     queues.erase(name);
 }
 
 Queue::shared_ptr QueueRegistry::find(const string& name){
-    Mutex::ScopedLock locker(lock);
+    RWlock::ScopedRlock locker(lock);
     QueueMap::iterator i = queues.find(name);
     if (i == queues.end()) {
        return Queue::shared_ptr();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Thu Jul  5 
09:19:05 2007
@@ -64,7 +64,7 @@
     void destroy(const string& name);
     template <class Test> void destroyIf(const string& name, Test test)
     {
-        qpid::sys::Mutex::ScopedLock locker(lock);
+        qpid::sys::RWlock::ScopedWlock locker(lock);
         if (test()) {
             queues.erase(name);
         }
@@ -88,7 +88,7 @@
 private:
     typedef std::map<string, Queue::shared_ptr> QueueMap;
     QueueMap queues;
-    qpid::sys::Mutex lock;
+    qpid::sys::RWlock lock;
     int counter;
     MessageStore* const store;
 };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Jul  5 
09:19:05 2007
@@ -120,7 +120,7 @@
 
 
 bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/){
-    Monitor::ScopedLock l(lock);
+    RWlock::ScopedWlock l(lock);
     TopicPattern routingPattern(routingKey);
     if (isBound(queue, routingPattern)) {
         return false;
@@ -131,7 +131,7 @@
 }
 
 bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey, 
const FieldTable* /*args*/){
-    Monitor::ScopedLock l(lock);
+    RWlock::ScopedWlock l(lock);
     BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
     Queue::vector& qv(bi->second);
     if (bi == bindings.end()) return false;
@@ -151,7 +151,7 @@
 }
 
 void TopicExchange::route(Deliverable& msg, const string& routingKey, const 
FieldTable* /*args*/){
-    Monitor::ScopedLock l(lock);
+    RWlock::ScopedRlock l(lock);
     for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
         if (i->first.match(routingKey)) {
             Queue::vector& qv(i->second);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu Jul  5 
09:19:05 2007
@@ -74,7 +74,7 @@
 class TopicExchange : public virtual Exchange{
     typedef std::map<TopicPattern, Queue::vector> BindingMap;
     BindingMap bindings;
-    qpid::sys::Mutex lock;
+    qpid::sys::RWlock lock;
 
     bool isBound(Queue::shared_ptr queue, TopicPattern& pattern);
   public:

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h Thu Jul  5 09:19:05 2007
@@ -46,6 +46,26 @@
     L& mutex;
 };
 
+template <class L>
+class ScopedRlock
+{
+  public:
+    ScopedRlock(L& l) : mutex(l) { l.rlock(); }
+    ~ScopedRlock() { mutex.unlock(); }
+  private:
+    L& mutex;
+};
+
+template <class L>
+class ScopedWlock
+{
+  public:
+    ScopedWlock(L& l) : mutex(l) { l.wlock(); }
+    ~ScopedWlock() { mutex.unlock(); }
+  private:
+    L& mutex;
+};
+
 }}
     
 #ifdef USE_APR_PLATFORM

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h Thu Jul  5 
09:19:05 2007
@@ -38,29 +38,58 @@
 public:
     typedef ScopedLock<Mutex> ScopedLock;
     typedef ScopedUnlock<Mutex> ScopedUnlock;
-    
+     
     inline Mutex();
     inline ~Mutex();
-    inline void lock();
+    inline void lock();  
     inline void unlock();
-    inline void trylock();
+    inline void trylock();  
+
 
 protected:
     pthread_mutex_t mutex;
 };
 
 /**
+ * RW lock.
+ */
+class RWlock : private boost::noncopyable {
+    friend class Condition;
+
+public:
+    typedef ScopedRlock<RWlock> ScopedRlock;
+    typedef ScopedWlock<RWlock> ScopedWlock;
+    
+    inline RWlock();
+    inline ~RWlock();
+    inline void wlock();  // will write-lock
+    inline void rlock();  // will read-lock
+    inline void unlock();
+    inline void trywlock();  // will write-try
+    inline void tryrlock();  // will read-try
+
+protected:
+    pthread_rwlock_t rwlock;
+};
+
+
+/**
  * Initialise a recursive mutex attr for use in creating mutexes later
  * (we use pthread_once to make sure it is initialised exactly once)
  */
 namespace {
        pthread_once_t  onceControl = PTHREAD_ONCE_INIT;
+       pthread_rwlockattr_t rwlockattr;
        pthread_mutexattr_t mutexattr;
        
        void initMutexattr()  {
                pthread_mutexattr_init(&mutexattr);
                pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
        }
+
+       void initRWlockattr()  {
+               pthread_rwlockattr_init(&rwlockattr);
+       }
        
        struct RecursiveMutexattr {
                RecursiveMutexattr() {
@@ -71,8 +100,21 @@
                        return &mutexattr;
                }
        };
+       struct RecursiveRWlockattr {
+               RecursiveRWlockattr() {
+                       pthread_once(&onceControl, initRWlockattr);
+               }
+               
+               operator const pthread_rwlockattr_t*() const {
+                       return &rwlockattr;
+               }
+       };
        
        RecursiveMutexattr recursiveMutexattr;
+       RecursiveRWlockattr recursiveRWlockattr;
+       
+       
+       
 }
 
 /**
@@ -83,9 +125,9 @@
 {
     typedef ScopedLock<PODMutex> ScopedLock;
 
-    inline void lock();
+    inline void lock();  
     inline void unlock();
-    inline void trylock();
+    inline void trylock();  
 
     // Must be public to be a POD:
     pthread_mutex_t mutex;
@@ -96,6 +138,7 @@
 void PODMutex::lock() {
     QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
 }
+
 void PODMutex::unlock() {
     QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
 }
@@ -115,6 +158,7 @@
 void Mutex::lock() {
     QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
 }
+
 void Mutex::unlock() {
     QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
 }
@@ -122,6 +166,36 @@
 void Mutex::trylock() {
     QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
 }
+
+
+RWlock::RWlock() {
+    QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr));
+}
+
+RWlock::~RWlock(){
+    QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock));
+}
+
+void RWlock::wlock() {
+    QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock));
+}
+
+void RWlock::rlock() {
+    QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock));
+}
+
+void RWlock::unlock() {
+    QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock));
+}
+
+void RWlock::trywlock() {
+    QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
+}
+
+void RWlock::tryrlock() {
+    QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
+}
+
 
 }}
 #endif  /*!_sys_posix_Mutex_h*/


Reply via email to