Author: aconway
Date: Fri Nov  7 12:48:38 2008
New Revision: 712258

URL: http://svn.apache.org/viewvc?rev=712258&view=rev
Log:

broker/Message, IncompleteMessageList: drop waitFor(De|En)Complete, replace 
with callbacks.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h

Modified: 
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.cpp Fri 
Nov  7 12:48:38 2008
@@ -18,38 +18,55 @@
  * under the License.
  *
  */
-#include "IncompleteMessageList.h"
 
-#include "Message.h"
+#include "IncompleteMessageList.h"
 
 namespace qpid {
 namespace broker {
 
+IncompleteMessageList::IncompleteMessageList() :
+    callback(boost::bind(&IncompleteMessageList::enqueueComplete, this, _1))
+{}
+
 void IncompleteMessageList::add(boost::intrusive_ptr<Message> msg)
 {
+    sys::Mutex::ScopedLock l(lock);
+    msg->setEnqueueCompleteCallback(callback);
     incomplete.push_back(msg);
 }
 
-void IncompleteMessageList::process(const CompletionListener& l, bool sync)
+void IncompleteMessageList::enqueueComplete(const 
boost::intrusive_ptr<Message>& ) {
+    sys::Mutex::ScopedLock l(lock);
+    lock.notify();
+}
+
+void IncompleteMessageList::process(const CompletionListener& listen, bool 
sync)
 {
+    sys::Mutex::ScopedLock l(lock);
     while (!incomplete.empty()) {
         boost::intrusive_ptr<Message>& msg = incomplete.front();
         if (!msg->isEnqueueComplete()) {
             if (sync){
                 msg->flush();
-                msg->waitForEnqueueComplete();
+                while (!msg->isEnqueueComplete())
+                    lock.wait();
             } else {
                 //leave the message as incomplete for now
                 return;
             }            
         }
-        l(msg);
+        listen(msg);
         incomplete.pop_front();
     }
 }
 
-void IncompleteMessageList::each(const CompletionListener& l) {
-    std::for_each(incomplete.begin(), incomplete.end(), l);
+void IncompleteMessageList::each(const CompletionListener& listen) {
+    Messages snapshot;
+    {
+        sys::Mutex::ScopedLock l(lock);
+        snapshot = incomplete;
+    }
+    std::for_each(incomplete.begin(), incomplete.end(), listen); // FIXME 
aconway 2008-11-07: passed by ref or value?
 }
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncompleteMessageList.h Fri 
Nov  7 12:48:38 2008
@@ -21,23 +21,30 @@
 #ifndef _IncompleteMessageList_
 #define _IncompleteMessageList_
 
-#include <list>
+#include "qpid/sys/Monitor.h"
+#include "qpid/broker/Message.h"
 #include <boost/intrusive_ptr.hpp>
 #include <boost/function.hpp>
+#include <list>
 
 namespace qpid {
 namespace broker {
 
-class Message;
-
 class IncompleteMessageList
 {
     typedef std::list< boost::intrusive_ptr<Message> > Messages;
+
+    void enqueueComplete(const boost::intrusive_ptr<Message>&);
+
+    sys::Monitor lock;
     Messages incomplete;
+    Message::MessageCallback callback;
 
 public:
-    typedef boost::function<void(boost::intrusive_ptr<Message>)> 
CompletionListener;    
+    typedef Message::MessageCallback CompletionListener;    
 
+    IncompleteMessageList();
+    
     void add(boost::intrusive_ptr<Message> msg);
     void process(const CompletionListener& l, bool sync);
     void each(const CompletionListener& l);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Nov  7 
12:48:38 2008
@@ -348,7 +348,7 @@
         sys::Mutex::ScopedLock l(lock);
         swap(cb, enqueueCallback);
     }
-    if (cb && *cb) (*cb)(*this);
+    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
 void Message::allDequeuesComplete() {
@@ -357,7 +357,11 @@
         sys::Mutex::ScopedLock l(lock);
         swap(cb, dequeueCallback);
     }
-    if (cb && *cb) (*cb)(*this);
+    if (cb && *cb) (*cb)(intrusive_ptr<Message>(this));
 }
 
+void Message::setEnqueueCompleteCallback(MessageCallback& cb) { 
enqueueCallback = &cb; }
+
+void Message::setDequeueCompleteCallback(MessageCallback& cb) { 
dequeueCallback = &cb; }
+
 }} // namespace qpid::broker

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Nov  7 12:48:38 
2008
@@ -48,7 +48,7 @@
 
 class Message : public PersistableMessage {
 public:
-    typedef boost::function<void (Message&)> MessageCallback;
+    typedef boost::function<void (const boost::intrusive_ptr<Message>&)> 
MessageCallback;
     
     Message(const framing::SequenceNumber& id = framing::SequenceNumber());
     ~Message();
@@ -145,10 +145,10 @@
     void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* 
qfor);
 
     /** Call cb when enqueue is complete, may call immediately. Holds cb by 
reference. */
-    void setEnqueueCompleteCallback(const MessageCallback* cb);
+    void setEnqueueCompleteCallback(MessageCallback& cb);
 
     /** Call cb when dequeue is complete, may call immediately. Holds cb by 
reference. */
-    void setDequeueCompleteCallback(const MessageCallback& cb);
+    void setDequeueCompleteCallback(MessageCallback& cb);
 
   private:
     typedef std::map<const Queue*,boost::intrusive_ptr<Message> > Replacement;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Fri 
Nov  7 12:48:38 2008
@@ -63,25 +63,17 @@
 
 bool PersistableMessage::isContentReleased()const { return contentReleased; }
        
-void PersistableMessage::waitForEnqueueComplete() {
-    sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
-    while (asyncEnqueueCounter > 0) {
-        asyncEnqueueLock.wait();
-    }
-}
-
 bool PersistableMessage::isEnqueueComplete() {
-    sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
     return asyncEnqueueCounter == 0;
 }
 
 void PersistableMessage::enqueueComplete() {
     bool notify = false;
     {
-        sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+        sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
         if (asyncEnqueueCounter > 0) {
             if (--asyncEnqueueCounter == 0) {
-                asyncEnqueueLock.notify();
                 notify = true;
             }
         }
@@ -109,36 +101,28 @@
 }
 
 void PersistableMessage::enqueueAsync() { 
-    sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
+    sys::ScopedLock<sys::Mutex> l(asyncEnqueueLock);
     asyncEnqueueCounter++; 
 }
 
 bool PersistableMessage::isDequeueComplete() { 
-    sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
     return asyncDequeueCounter == 0;
 }
     
 void PersistableMessage::dequeueComplete() { 
     bool notify = false;
     {
-        sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+        sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
         if (asyncDequeueCounter > 0) {
             if (--asyncDequeueCounter == 0) {
                 notify = true;
-                asyncDequeueLock.notify();
             }
         }
     }
     if (notify) allDequeuesComplete();
 }
 
-void PersistableMessage::waitForDequeueComplete() {
-    sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
-    while (asyncDequeueCounter > 0) {
-        asyncDequeueLock.wait();
-    }
-}
-
 void PersistableMessage::dequeueAsync(PersistableQueue::shared_ptr queue, 
MessageStore* _store) { 
     if (_store){
         sys::ScopedLock<sys::Mutex> l(storeLock);
@@ -150,7 +134,7 @@
 }
 
 void PersistableMessage::dequeueAsync() { 
-    sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
+    sys::ScopedLock<sys::Mutex> l(asyncDequeueLock);
     asyncDequeueCounter++; 
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=712258&r1=712257&r2=712258&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Fri Nov  
7 12:48:38 2008
@@ -28,7 +28,7 @@
 #include <boost/weak_ptr.hpp>
 #include "Persistable.h"
 #include "qpid/framing/amqp_types.h"
-#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Mutex.h"
 #include "PersistableQueue.h"
 
 namespace qpid {
@@ -42,8 +42,8 @@
 class PersistableMessage : public Persistable
 {
     typedef std::list< boost::weak_ptr<PersistableQueue> > syncList;
-    sys::Monitor asyncEnqueueLock;
-    sys::Monitor asyncDequeueLock;
+    sys::Mutex asyncEnqueueLock;
+    sys::Mutex asyncDequeueLock;
     sys::Mutex storeLock;
        
     /**
@@ -93,8 +93,6 @@
     
     bool isContentReleased() const;
        
-    void waitForEnqueueComplete();
-
     bool isEnqueueComplete();
 
     void enqueueComplete();
@@ -107,8 +105,6 @@
     
     void dequeueComplete();
 
-    void waitForDequeueComplete();
-
     void dequeueAsync(PersistableQueue::shared_ptr queue, MessageStore* 
_store);
 
     void dequeueAsync();


Reply via email to