Author: cctrieloff
Date: Wed Aug 15 11:13:02 2007
New Revision: 566289
URL: http://svn.apache.org/viewvc?view=rev&rev=566289
Log:
async IO for broker store
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Aug 15
11:13:02 2007
@@ -56,6 +56,14 @@
Queue::~Queue(){}
+void Queue::notifyDurableIOComplete()
+{
+ // signal SemanticHander to ack completed dequeues
+ // then dispatch to ack...
+ serializer.execute(dispatchCallback);
+}
+
+
void Queue::deliver(Message::shared_ptr& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
@@ -63,11 +71,20 @@
alternateExchange->route(deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
}
} else {
- enqueue(0, msg);
- process(msg);
+
+
+ // if no store then mark as enqueued
+ if (!enqueue(0, msg)){
+ push(msg);
+ msg->enqueueComplete();
+ }else {
+ push(msg);
+ }
+ serializer.execute(dispatchCallback);
}
}
+
void Queue::recover(Message::shared_ptr& msg){
push(msg);
if (store && msg->expectedContentSize() != msg->encodedContentSize()) {
@@ -127,6 +144,7 @@
void Queue::dispatch(){
+
Message::shared_ptr msg;
while(true){
{
@@ -134,7 +152,7 @@
if (messages.empty()) break;
msg = messages.front();
}
- if( dispatch(msg) ){
+ if( msg->isEnqueueComplete() && dispatch(msg) ){
pop();
}else break;
@@ -215,19 +233,26 @@
return autodelete && consumers.size() == 0;
}
-void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists,
+bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
store->enqueue(ctxt, *msg.get(), *this);
+ return true;
}
+ return false;
}
-void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
+// return true if store exists,
+bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
store->dequeue(ctxt, *msg.get(), *this);
+ return true;
}
+ return false;
}
+
namespace
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Wed Aug 15
11:13:02 2007
@@ -87,6 +87,12 @@
*/
void dispatch();
+ protected:
+ /**
+ * Call back from store
+ */
+ virtual void notifyDurableIOComplete();
+
public:
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -143,11 +149,11 @@
bool canAutoDelete() const;
- void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
+ bool enqueue(TransactionContext* ctxt, Message::shared_ptr& msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
- void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
+ bool dequeue(TransactionContext* ctxt, Message::shared_ptr& msg);
/**
* dequeues from memory only
*/
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Wed Aug 15
11:13:02 2007
@@ -104,7 +104,10 @@
/**
* Enqueues a message, storing the message if it has not
* been previously stored and recording that the given
- * message is on the given queue.
+ * message is on the given queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
*
* @param msg the message to enqueue
* @param queue the name of the queue onto which it is to be enqueued
@@ -113,18 +116,34 @@
* place or null for 'local' transactions
*/
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue) = 0;
+
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
* if it is no longer on any other queue.
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
*
* @param msg the message to dequeue
- * @param queue the name of th queue from which it is to be dequeued
+ * @param queue the name of the queue from which it is to be dequeued
* @param xid (a pointer to) an identifier of the
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue) = 0;
+
+
+ /**
+ * Returns the number of outstanding AIO's for a given queue
+ *
+ * If 0, than all the enqueue / dequeues have been stored
+ * to disk
+ *
+ * @param queue the name of the queue to check for outstanding AIO
+ */
+ virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue) = 0;
+
virtual ~MessageStore(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Wed
Aug 15 11:13:02 2007
@@ -95,6 +95,11 @@
store->dequeue(ctxt, msg, queue);
}
+u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue&
queue)
+{
+ return store->outstandingQueueAIO(queue);
+}
+
std::auto_ptr<TransactionContext> MessageStoreModule::begin()
{
return store->begin();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Wed Aug
15 11:13:02 2007
@@ -59,8 +59,10 @@
void destroy(PersistableMessage& msg);
void appendContent(PersistableMessage& msg, const std::string& data);
void loadContent(PersistableMessage& msg, std::string& data, uint64_t
offset, uint32_t length);
+
void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const
PersistableQueue& queue);
void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const
PersistableQueue& queue);
+ u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
~MessageStoreModule(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Wed Aug
15 11:13:02 2007
@@ -97,14 +97,21 @@
QPID_LOG(info, "Can't load content. Persistence not enabled.");
}
-void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const
PersistableQueue& queue)
+void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg,
const PersistableQueue& queue)
{
+ msg.enqueueComplete();
QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'.
Persistence not enabled.");
}
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const
PersistableQueue& queue)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg,
const PersistableQueue& queue)
{
+ msg.dequeueComplete();
QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'.
Persistence not enabled.");
+}
+
+u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )
+{
+ return 0;
}
std::auto_ptr<TransactionContext> NullMessageStore::begin()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Wed Aug 15
11:13:02 2007
@@ -62,6 +62,7 @@
virtual void loadContent(PersistableMessage& msg, std::string& data,
uint64_t offset, uint32_t length);
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue);
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue);
+ virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
~NullMessageStore(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Aug
15 11:13:02 2007
@@ -36,6 +36,23 @@
*/
class PersistableMessage : public Persistable
{
+
+
+ /**
+ * Needs to be set false on Message construction, then
+ * set once the broker has taken responsibility for the
+ * message. For transient, once enqueued, for durable, once
+ * stored.
+ */
+ bool enqueueCompleted;
+ /**
+ * Needs to be set false on Message construction, then
+ * set once the dequeueis complete, it gets set
+ * For transient, once dequeued, for durable, once
+ * dequeue record has been stored.
+ */
+ bool dequeueCompleted;
+
public:
typedef boost::shared_ptr<PersistableMessage> shared_ptr;
@@ -45,6 +62,15 @@
virtual uint32_t encodedHeaderSize() const = 0;
virtual ~PersistableMessage() {};
+ PersistableMessage():
+ enqueueCompleted(false),
+ dequeueCompleted(false){};
+
+ inline bool isEnqueueComplete() {return enqueueCompleted;};
+ inline void enqueueComplete() {enqueueCompleted = true;};
+ inline bool isDequeueComplete() {return dequeueCompleted;};
+ inline void dequeueComplete() {dequeueCompleted = true;};
+
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableQueue.h Wed Aug 15
11:13:02 2007
@@ -35,8 +35,20 @@
class PersistableQueue : public Persistable
{
public:
+
virtual const std::string& getName() const = 0;
virtual ~PersistableQueue() {};
+
+protected:
+ /**
+ * call back for the store to signal AIO writes have
+ * completed (enqueue/dequeue etc)
+ *
+ * Note: DO NOT do work on this callback, if you block
+ * this callback you will block the store.
+ */
+ virtual void notifyDurableIOComplete() = 0;
+
};
}}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?view=diff&rev=566289&r1=566288&r2=566289
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Wed Aug 15
11:13:02 2007
@@ -51,7 +51,14 @@
: ctxt(_ctxt), msg(_msg){}
void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
- queue->enqueue(ctxt, msg);
+ if (!queue->enqueue(ctxt, msg)){
+ /**
+ * if not store then mark message for ack and deleivery once
+ * commit happens, as async IO will never set it when no store
+ * exists
+ */
+ msg->enqueueComplete();
+ }
}
TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}