Author: cctrieloff
Date: Thu Oct 18 18:33:24 2007
New Revision: 586207
URL: http://svn.apache.org/viewvc?rev=586207&view=rev
Log:
- added init for dir and async options for store
- added flush for queue for async processing
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct 18
18:33:24 2007
@@ -103,7 +103,8 @@
exchanges.declare(amq_match, HeadersExchange::typeName);
if(store.get()) {
- RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
+ store->init(conf.storeDir, conf.storeAsync);
+ RecoveryManagerImpl recoverer(queues, exchanges, dtxManager,
conf.stagingThreshold);
store->recover(recoverer);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Thu Oct 18
18:33:24 2007
@@ -37,6 +37,16 @@
*/
class MessageStore : public TransactionalStore, public Recoverable {
public:
+
+ /**
+ * init the store, call before any other call. If not called, store
+ * is free to pick any defaults
+ *
+ * @param dir the directory to create logs/db's
+ * @param async true, enable async, false, enable sync
+ */
+ virtual void init(const std::string& dir, const bool async) = 0;
+
/**
* Record the existence of a durable queue
*/
@@ -133,6 +143,15 @@
*/
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue) = 0;
+ /**
+ * Flushes all async messages to disk for the specified queue
+ *
+ * Note: that this is async so the return of the function does
+ * not mean the opperation is complete.
+ *
+ * @param queue the name of the queue from which it is to be dequeued
+ */
+ virtual void flush(const qpid::broker::PersistableQueue& queue)=0;
/**
* Returns the number of outstanding AIO's for a given queue
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Thu
Oct 18 18:33:24 2007
@@ -28,6 +28,11 @@
{
}
+void MessageStoreModule::init(const std::string& dir, const bool async)
+{
+ store->init(dir, async);
+}
+
void MessageStoreModule::create(PersistableQueue& queue)
{
store->create(queue);
@@ -93,6 +98,11 @@
void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage&
msg, const PersistableQueue& queue)
{
store->dequeue(ctxt, msg, queue);
+}
+
+void MessageStoreModule::flush(const qpid::broker::PersistableQueue& queue)
+{
+ store->flush(queue);
}
u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue&
queue)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Thu Oct
18 18:33:24 2007
@@ -38,6 +38,7 @@
public:
MessageStoreModule(const std::string& name);
+ void init(const std::string& dir, const bool async);
std::auto_ptr<TransactionContext> begin();
std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
void prepare(TPCTransactionContext& txn);
@@ -62,7 +63,8 @@
void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const
PersistableQueue& queue);
void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const
PersistableQueue& queue);
u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
-
+ void flush(const qpid::broker::PersistableQueue& queue);
+
~MessageStoreModule(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Oct
18 18:33:24 2007
@@ -49,6 +49,11 @@
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+void NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/)
+{
+ QPID_LOG(info, "Can't init, store not enabled");
+}
+
void NullMessageStore::create(PersistableQueue& queue)
{
QPID_LOG(info, "Can't create durable queue '" << queue.getName() << "'.
Persistence not enabled.");
@@ -107,6 +112,11 @@
{
msg.dequeueComplete();
QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'.
Persistence not enabled.");
+}
+
+void NullMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+{
+ QPID_LOG(info, "Can't flush. Persistence not enabled queue-" <<
queue.getName());
}
u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Oct 18
18:33:24 2007
@@ -38,6 +38,7 @@
public:
NullMessageStore(bool warn = false);
+ virtual void init(const std::string& dir, const bool async);
virtual std::auto_ptr<TransactionContext> begin();
virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
virtual void prepare(TPCTransactionContext& txn);
@@ -62,6 +63,7 @@
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue);
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg,
const PersistableQueue& queue);
virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
+ virtual void flush(const qpid::broker::PersistableQueue& queue);
~NullMessageStore(){}
};