Author: cctrieloff
Date: Thu Jul 5 09:19:05 2007
New Revision: 553549
URL: http://svn.apache.org/viewvc?view=rev&rev=553549
Log:
- Added RW lock
- Updated all exchanges to us RW lock
- Updated all registries to us RW lock
- Still need to do (client, channel, message and queues)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.cpp Thu Jul 5
09:19:05 2007
@@ -30,7 +30,7 @@
DirectExchange::DirectExchange(const std::string& _name, bool _durable, const
FieldTable& _args) : Exchange(_name, _durable, _args) {}
bool DirectExchange::bind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable*){
- Mutex::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(),
queues.end(), queue);
if (i == queues.end()) {
@@ -42,7 +42,7 @@
}
bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
std::vector<Queue::shared_ptr>::iterator i = find(queues.begin(),
queues.end(), queue);
@@ -58,7 +58,7 @@
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const
FieldTable* /*args*/){
- Mutex::ScopedLock l(lock);
+ RWlock::ScopedRlock l(lock);
std::vector<Queue::shared_ptr>& queues(bindings[routingKey]);
int count(0);
for(std::vector<Queue::shared_ptr>::iterator i = queues.begin(); i !=
queues.end(); i++, count++){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DirectExchange.h Thu Jul 5
09:19:05 2007
@@ -35,7 +35,7 @@
typedef std::vector<Queue::shared_ptr> Queues;
typedef std::map<string, Queues > Bindings;
Bindings bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.cpp Thu Jul
5 09:19:05 2007
@@ -38,7 +38,7 @@
pair<Exchange::shared_ptr, bool> ExchangeRegistry::declare(const string& name,
const string& type,
bool durable, const
FieldTable& args)
throw(UnknownExchangeTypeException){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end()) {
Exchange::shared_ptr exchange;
@@ -62,7 +62,7 @@
}
void ExchangeRegistry::destroy(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i != exchanges.end()) {
exchanges.erase(i);
@@ -70,7 +70,7 @@
}
Exchange::shared_ptr ExchangeRegistry::get(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedRlock locker(lock);
ExchangeMap::iterator i = exchanges.find(name);
if (i == exchanges.end())
throw ChannelException(404, "Exchange not found:" + name);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ExchangeRegistry.h Thu Jul 5
09:19:05 2007
@@ -35,7 +35,7 @@
class ExchangeRegistry{
typedef std::map<std::string, Exchange::shared_ptr> ExchangeMap;
ExchangeMap exchanges;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name,
const std::string& type)
throw(UnknownExchangeTypeException);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.cpp Thu Jul 5
09:19:05 2007
@@ -29,7 +29,7 @@
FanOutExchange::FanOutExchange(const std::string& _name, bool _durable, const
FieldTable& _args) : Exchange(_name, _durable, _args) {}
bool FanOutExchange::bind(Queue::shared_ptr queue, const string&
/*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
// Add if not already present.
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(),
queue);
if (i == bindings.end()) {
@@ -41,7 +41,7 @@
}
bool FanOutExchange::unbind(Queue::shared_ptr queue, const string&
/*routingKey*/, const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
Queue::vector::iterator i = std::find(bindings.begin(), bindings.end(),
queue);
if (i != bindings.end()) {
bindings.erase(i);
@@ -52,7 +52,7 @@
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/,
const FieldTable* /*args*/){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedRlock locker(lock);
for(Queue::vector::iterator i = bindings.begin(); i != bindings.end();
++i){
msg.deliverTo(*i);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/FanOutExchange.h Thu Jul 5
09:19:05 2007
@@ -34,7 +34,7 @@
class FanOutExchange : public virtual Exchange {
std::vector<Queue::shared_ptr> bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Thu Jul
5 09:19:05 2007
@@ -44,7 +44,7 @@
HeadersExchange::HeadersExchange(const std::string& _name, bool _durable,
const FieldTable& _args) : Exchange(_name, _durable, _args) {}
bool HeadersExchange::bind(Queue::shared_ptr queue, const string&
/*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
std::string what = args->getString("x-match");
if (what != all && what != any) {
THROW_QPID_ERROR(PROTOCOL_ERROR, "Invalid x-match value binding to
headers exchange.");
@@ -61,7 +61,7 @@
}
bool HeadersExchange::unbind(Queue::shared_ptr queue, const string&
/*routingKey*/, const FieldTable* args){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
Bindings::iterator i =
std::find(bindings.begin(),bindings.end(), Binding(*args, queue));
if (i != bindings.end()) {
@@ -74,7 +74,7 @@
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/,
const FieldTable* args){
- Mutex::ScopedLock locker(lock);;
+ RWlock::ScopedRlock locker(lock);;
for (Bindings::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (match(i->first, *args)) msg.deliverTo(i->second);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.h Thu Jul 5
09:19:05 2007
@@ -37,7 +37,7 @@
typedef std::vector<Binding> Bindings;
Bindings bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
public:
static const std::string typeName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Thu Jul 5
09:19:05 2007
@@ -33,7 +33,7 @@
QueueRegistry::declare(const string& declareName, bool durable,
bool autoDelete, const ConnectionToken* owner)
{
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
assert(!name.empty());
QueueMap::iterator i = queues.find(name);
@@ -47,12 +47,12 @@
}
void QueueRegistry::destroy(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedWlock locker(lock);
queues.erase(name);
}
Queue::shared_ptr QueueRegistry::find(const string& name){
- Mutex::ScopedLock locker(lock);
+ RWlock::ScopedRlock locker(lock);
QueueMap::iterator i = queues.find(name);
if (i == queues.end()) {
return Queue::shared_ptr();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Thu Jul 5
09:19:05 2007
@@ -64,7 +64,7 @@
void destroy(const string& name);
template <class Test> void destroyIf(const string& name, Test test)
{
- qpid::sys::Mutex::ScopedLock locker(lock);
+ qpid::sys::RWlock::ScopedWlock locker(lock);
if (test()) {
queues.erase(name);
}
@@ -88,7 +88,7 @@
private:
typedef std::map<string, Queue::shared_ptr> QueueMap;
QueueMap queues;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
int counter;
MessageStore* const store;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.cpp Thu Jul 5
09:19:05 2007
@@ -120,7 +120,7 @@
bool TopicExchange::bind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
TopicPattern routingPattern(routingKey);
if (isBound(queue, routingPattern)) {
return false;
@@ -131,7 +131,7 @@
}
bool TopicExchange::unbind(Queue::shared_ptr queue, const string& routingKey,
const FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
+ RWlock::ScopedWlock l(lock);
BindingMap::iterator bi = bindings.find(TopicPattern(routingKey));
Queue::vector& qv(bi->second);
if (bi == bindings.end()) return false;
@@ -151,7 +151,7 @@
}
void TopicExchange::route(Deliverable& msg, const string& routingKey, const
FieldTable* /*args*/){
- Monitor::ScopedLock l(lock);
+ RWlock::ScopedRlock l(lock);
for (BindingMap::iterator i = bindings.begin(); i != bindings.end(); ++i) {
if (i->first.match(routingKey)) {
Queue::vector& qv(i->second);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TopicExchange.h Thu Jul 5
09:19:05 2007
@@ -74,7 +74,7 @@
class TopicExchange : public virtual Exchange{
typedef std::map<TopicPattern, Queue::vector> BindingMap;
BindingMap bindings;
- qpid::sys::Mutex lock;
+ qpid::sys::RWlock lock;
bool isBound(Queue::shared_ptr queue, TopicPattern& pattern);
public:
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Mutex.h Thu Jul 5 09:19:05 2007
@@ -46,6 +46,26 @@
L& mutex;
};
+template <class L>
+class ScopedRlock
+{
+ public:
+ ScopedRlock(L& l) : mutex(l) { l.rlock(); }
+ ~ScopedRlock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
+template <class L>
+class ScopedWlock
+{
+ public:
+ ScopedWlock(L& l) : mutex(l) { l.wlock(); }
+ ~ScopedWlock() { mutex.unlock(); }
+ private:
+ L& mutex;
+};
+
}}
#ifdef USE_APR_PLATFORM
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h?view=diff&rev=553549&r1=553548&r2=553549
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/Mutex.h Thu Jul 5
09:19:05 2007
@@ -38,29 +38,58 @@
public:
typedef ScopedLock<Mutex> ScopedLock;
typedef ScopedUnlock<Mutex> ScopedUnlock;
-
+
inline Mutex();
inline ~Mutex();
- inline void lock();
+ inline void lock();
inline void unlock();
- inline void trylock();
+ inline void trylock();
+
protected:
pthread_mutex_t mutex;
};
/**
+ * RW lock.
+ */
+class RWlock : private boost::noncopyable {
+ friend class Condition;
+
+public:
+ typedef ScopedRlock<RWlock> ScopedRlock;
+ typedef ScopedWlock<RWlock> ScopedWlock;
+
+ inline RWlock();
+ inline ~RWlock();
+ inline void wlock(); // will write-lock
+ inline void rlock(); // will read-lock
+ inline void unlock();
+ inline void trywlock(); // will write-try
+ inline void tryrlock(); // will read-try
+
+protected:
+ pthread_rwlock_t rwlock;
+};
+
+
+/**
* Initialise a recursive mutex attr for use in creating mutexes later
* (we use pthread_once to make sure it is initialised exactly once)
*/
namespace {
pthread_once_t onceControl = PTHREAD_ONCE_INIT;
+ pthread_rwlockattr_t rwlockattr;
pthread_mutexattr_t mutexattr;
void initMutexattr() {
pthread_mutexattr_init(&mutexattr);
pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_RECURSIVE);
}
+
+ void initRWlockattr() {
+ pthread_rwlockattr_init(&rwlockattr);
+ }
struct RecursiveMutexattr {
RecursiveMutexattr() {
@@ -71,8 +100,21 @@
return &mutexattr;
}
};
+ struct RecursiveRWlockattr {
+ RecursiveRWlockattr() {
+ pthread_once(&onceControl, initRWlockattr);
+ }
+
+ operator const pthread_rwlockattr_t*() const {
+ return &rwlockattr;
+ }
+ };
RecursiveMutexattr recursiveMutexattr;
+ RecursiveRWlockattr recursiveRWlockattr;
+
+
+
}
/**
@@ -83,9 +125,9 @@
{
typedef ScopedLock<PODMutex> ScopedLock;
- inline void lock();
+ inline void lock();
inline void unlock();
- inline void trylock();
+ inline void trylock();
// Must be public to be a POD:
pthread_mutex_t mutex;
@@ -96,6 +138,7 @@
void PODMutex::lock() {
QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
}
+
void PODMutex::unlock() {
QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
}
@@ -115,6 +158,7 @@
void Mutex::lock() {
QPID_POSIX_THROW_IF(pthread_mutex_lock(&mutex));
}
+
void Mutex::unlock() {
QPID_POSIX_THROW_IF(pthread_mutex_unlock(&mutex));
}
@@ -122,6 +166,36 @@
void Mutex::trylock() {
QPID_POSIX_THROW_IF(pthread_mutex_trylock(&mutex));
}
+
+
+RWlock::RWlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_init(&rwlock, recursiveRWlockattr));
+}
+
+RWlock::~RWlock(){
+ QPID_POSIX_THROW_IF(pthread_rwlock_destroy(&rwlock));
+}
+
+void RWlock::wlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_wrlock(&rwlock));
+}
+
+void RWlock::rlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_rdlock(&rwlock));
+}
+
+void RWlock::unlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_unlock(&rwlock));
+}
+
+void RWlock::trywlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_trywrlock(&rwlock));
+}
+
+void RWlock::tryrlock() {
+ QPID_POSIX_THROW_IF(pthread_rwlock_tryrdlock(&rwlock));
+}
+
}}
#endif /*!_sys_posix_Mutex_h*/