Author: aconway
Date: Fri Nov 7 12:48:38 2008
New Revision: 712258
URL: http://svn.apache.org/viewvc?rev=712258&view=rev
Log:
broker/Message, IncompleteMessageList: drop waitFor(De|En)Complete, replace
with callbacks.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Fri
Nov 7 12:48:38 2008
@@ -18,38 +18,55 @@
* under the License.
*
*/
-#include "IncompleteMessageList.h"
-#include "Message.h"
+#include "IncompleteMessageList.h"
namespace qpid {
namespace broker {
+IncompleteMessageList::IncompleteMessageList() :
+ callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1))
+{}
+
void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
{
+ sys::Mutex::ScopedLock l(lock);
+ msg->setEnqueueCompleteCallback(callback);
incomplete.push_back(msg);
}
-void IncompleteMessageList::process(const CompletionListener& l, bool sync)
+void IncompleteMessageList::enqueueComplete(const
boost::intrusive_ptr<Message>& ) {
+ sys::Mutex::ScopedLock l(lock);
+ lock.notify();
+}
+
+void IncompleteMessageList::process(const CompletionListener& listen, bool
sync)
{
+ sys::Mutex::ScopedLock l(lock);
while (!incomplete.empty()) {
boost::intrusive_ptr<Message>& msg = incomplete.front();
if (!msg->isEnqueueComplete()) {
if (sync){
msg->flush();
- msg->waitForEnqueueComplete();
+ while (!msg->isEnqueueComplete())
+ lock.wait();
} else {
//leave the message as incomplete for now
return;
}
}
- l(msg);
+ listen(msg);
incomplete.pop_front();
}
}
-void IncompleteMessageList::each(const CompletionListener& l) {
- std::for_each(incomplete.begin(), incomplete.end(), l);
+void IncompleteMessageList::each(const CompletionListener& listen) {
+ Messages snapshot;
+ {
+ sys::Mutex::ScopedLock l(lock);
+ snapshot = incomplete;
+ }
+ std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME
aconway 2008-11-07: passed by ref or value?
}
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h Fri
Nov 7 12:48:38 2008
@@ -21,23 +21,30 @@
#ifndef _IncompleteMessageList_
#define _IncompleteMessageList_
-#include <list>
+#include "qpid/sys/Monitor.h"
+#include "qpid/broker/Message.h"
#include <boost/intrusive_ptr.hpp>
#include <boost/function.hpp>
+#include <list>
namespace qpid {
namespace broker {
-class Message;
-
class IncompleteMessageList
{
typedef std::list< boost::intrusive_ptr<Message> > Messages;
+
+ void enqueueComplete(const boost::intrusive_ptr<Message>&);
+
+ sys::Monitor lock;
Messages incomplete;
+ Message::MessageCallback callback;
public:
- typedef boost::function<void(boost::intrusive_ptr<Message>)>
CompletionListener;
+ typedef Message::MessageCallback CompletionListener;
+ IncompleteMessageList();
+
void add(boost::intrusive_ptr<Message> msg);
void process(const CompletionListener& l, bool sync);
void each(const CompletionListener& l);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Nov 7
12:48:38 2008
@@ -348,7 +348,7 @@
sys::Mutex::ScopedLock l(lock);
swap(cb, enqueueCallback);
}
- if (cb && *cb) (*cb)(*this);
+ if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
void Message::allDequeuesComplete() {
@@ -357,7 +357,11 @@
sys::Mutex::ScopedLock l(lock);
swap(cb, dequeueCallback);
}
- if (cb && *cb) (*cb)(*this);
+ if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
}
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) {
enqueueCallback = &cb; }
+
+void Message::setDequeueCompleteCallback(MessageCallback& cb) {
dequeueCallback = &cb; }
+
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Nov 7 12:48:38
2008
@@ -48,7 +48,7 @@
class Message : public PersistableMessage {
public:
- typedef boost::function<void (Message&)> MessageCallback;
+ typedef boost::function<void (const boost::intrusive_ptr<Message>&)>
MessageCallback;
Message(const framing::SequenceNumber& id = framing::SequenceNumber());
~Message();
@@ -145,10 +145,10 @@
void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue*
qfor);
/** Call cb when enqueue is complete, may call immediately. Holds cb by
reference. */
- void setEnqueueCompleteCallback(const MessageCallback* cb);
+ void setEnqueueCompleteCallback(MessageCallback& cb);
/** Call cb when dequeue is complete, may call immediately. Holds cb by
reference. */
- void setDequeueCompleteCallback(const MessageCallback& cb);
+ void setDequeueCompleteCallback(MessageCallback& cb);
private:
typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Fri
Nov 7 12:48:38 2008
@@ -63,25 +63,17 @@
bool PersistableMessage::isContentReleased()const { return contentReleased; }
-void PersistableMessage::waitForEnqueueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
- while (asyncEnqueueCounter > 0) {
- asyncEnqueueLock.wait();
- }
-}
-
bool PersistableMessage::isEnqueueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
return asyncEnqueueCounter == 0;
}
void PersistableMessage::enqueueComplete() {
bool notify = false;
{
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
if (asyncEnqueueCounter > 0) {
if (--asyncEnqueueCounter == 0) {
- asyncEnqueueLock.notify();
notify = true;
}
}
@@ -109,36 +101,28 @@
}
void PersistableMessage::enqueueAsync() {
- sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
asyncEnqueueCounter++;
}
bool PersistableMessage::isDequeueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
return asyncDequeueCounter == 0;
}
void PersistableMessage::dequeueComplete() {
bool notify = false;
{
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
if (asyncDequeueCounter > 0) {
if (--asyncDequeueCounter == 0) {
notify = true;
- asyncDequeueLock.notify();
}
}
}
if (notify) allDequeuesComplete();
}
-void PersistableMessage::waitForDequeueComplete() {
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
- while (asyncDequeueCounter > 0) {
- asyncDequeueLock.wait();
- }
-}
-
void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue,
MessageStore* _store) {
if (_store){
sys::ScopedLock<sys::Mutex> l(storeLock);
@@ -150,7 +134,7 @@
}
void PersistableMessage::dequeueAsync() {
- sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+ sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
asyncDequeueCounter++;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Nov
7 12:48:38 2008
@@ -28,7 +28,7 @@
#include <boost/weak_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
#include "PersistableQueue.h"
namespace qpid {
@@ -42,8 +42,8 @@
class PersistableMessage : public Persistable
{
typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
- sys::Monitor asyncEnqueueLock;
- sys::Monitor asyncDequeueLock;
+ sys::Mutex asyncEnqueueLock;
+ sys::Mutex asyncDequeueLock;
sys::Mutex storeLock;
/**
@@ -93,8 +93,6 @@
bool isContentReleased() const;
- void waitForEnqueueComplete();
-
bool isEnqueueComplete();
void enqueueComplete();
@@ -107,8 +105,6 @@
void dequeueComplete();
- void waitForDequeueComplete();
-
void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore*
_store);
void dequeueAsync();