Author: cctrieloff
Date: Mon Oct 22 17:29:32 2007
New Revision: 587332
URL: http://svn.apache.org/viewvc?rev=587332&view=rev
Log:
- flush async IO if present on sync for 0-10
- notify, for ack from sync for 0-10
- use of raw pointer, to avoid recursive fre
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Mon Oct 22 17:29:32 2007
@@ -144,6 +144,7 @@
qpid/broker/BrokerSingleton.cpp \
qpid/broker/Exchange.cpp \
qpid/broker/Queue.cpp \
+ qpid/broker/PersistableMessage.cpp \
qpid/broker/Connection.cpp \
qpid/broker/ConnectionHandler.cpp \
qpid/broker/ConnectionFactory.cpp \
@@ -199,6 +200,7 @@
qpid/client/Connection.cpp \
qpid/client/Channel.cpp \
qpid/client/Exchange.cpp \
+ qpid/broker/PersistableMessage.cpp \
qpid/client/Queue.cpp \
qpid/client/ConnectionImpl.cpp \
qpid/client/Connector.cpp \
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/IncomingExecutionContext.cpp
Mon Oct 22 17:29:32 2007
@@ -112,6 +112,10 @@
void IncomingExecutionContext::wait()
{
check();
+ // for IO flush on the store
+ for (Messages::iterator i = incomplete.begin(); i != incomplete.end();
i++) {
+ (*i)->flush();
+ }
incomplete.front()->waitForEnqueueComplete();
flush();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Mon Oct
22 17:29:32 2007
@@ -23,14 +23,18 @@
*/
#include <string>
+#include <list>
#include <boost/shared_ptr.hpp>
#include "Persistable.h"
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/Monitor.h"
+#include "PersistableQueue.h"
namespace qpid {
namespace broker {
+class MessageStore;
+
/**
* The interface messages must expose to the MessageStore in order to
* be persistable.
@@ -39,7 +43,8 @@
{
sys::Monitor asyncEnqueueLock;
sys::Monitor asyncDequeueLock;
-
+ sys::Mutex storeLock;
+
/**
* Tracks the number of outstanding asynchronous enqueue
* operations. When the message is enqueued asynchronously the
@@ -57,7 +62,10 @@
* dequeues.
*/
int asyncDequeueCounter;
-
+protected:
+ typedef std::list<PersistableQueue*> syncList;
+ syncList synclist;
+ MessageStore* store;
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -70,8 +78,11 @@
PersistableMessage():
asyncEnqueueCounter(0),
- asyncDequeueCounter(0)
+ asyncDequeueCounter(0),
+ store(0)
{}
+
+ void flush();
inline void waitForEnqueueComplete() {
sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
@@ -94,6 +105,15 @@
}
}
+ inline void enqueueAsync(PersistableQueue* queue, MessageStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ store = _store;
+ synclist.push_back(queue);
+ }
+ enqueueAsync();
+ }
+
inline void enqueueAsync() {
sys::ScopedLock<sys::Monitor> l(asyncEnqueueLock);
asyncEnqueueCounter++;
@@ -105,6 +125,7 @@
}
inline void dequeueComplete() {
+
sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
if (asyncDequeueCounter > 0) {
if (--asyncDequeueCounter == 0) {
@@ -119,6 +140,15 @@
asyncDequeueLock.wait();
}
}
+
+ inline void dequeueAsync(PersistableQueue* queue, MessageStore* _store) {
+ if (_store){
+ sys::ScopedLock<sys::Mutex> l(storeLock);
+ store = _store;
+ synclist.push_back(queue);
+ }
+ dequeueAsync();
+ }
inline void dequeueAsync() {
sys::ScopedLock<sys::Monitor> l(asyncDequeueLock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h Mon Oct 22
17:29:32 2007
@@ -24,6 +24,7 @@
#include <string>
#include "Persistable.h"
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
@@ -49,16 +50,17 @@
class PersistableQueue : public Persistable
{
public:
+ typedef boost::shared_ptr<PersistableQueue> shared_ptr;
virtual const std::string& getName() const = 0;
virtual ~PersistableQueue() {
if (externalQueueStore)
- delete externalQueueStore;
+ delete externalQueueStore;
};
inline void setExternalQueueStore(ExternalQueueStore* inst){
if (externalQueueStore!=inst && externalQueueStore)
- delete externalQueueStore;
+ delete externalQueueStore;
externalQueueStore = inst;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=587332&r1=587331&r2=587332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Oct 22 17:29:32
2007
@@ -347,7 +347,8 @@
bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
{
if (msg->isPersistent() && store) {
- msg->enqueueAsync(); //increment to async counter -- for message sent
to more than one queue
+std::cout << "-------------- enqueue ------------" << std::endl << std::flush;
+ msg->enqueueAsync(this, store); //increment to async counter -- for
message sent to more than one queue
store->enqueue(ctxt, *msg.get(), *this);
return true;
}
@@ -358,7 +359,7 @@
bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg)
{
if (msg->isPersistent() && store) {
- msg->dequeueAsync(); //increment to async counter -- for message sent
to more than one queue
+ msg->dequeueAsync(this, store); //increment to async counter -- for
message sent to more than one queue
store->dequeue(ctxt, *msg.get(), *this);
return true;
}