Author: cctrieloff
Date: Thu Oct 18 18:33:24 2007
New Revision: 586207

URL: http://svn.apache.org/viewvc?rev=586207&view=rev
Log:
- added init for dir and async options for store
- added flush for queue for async processing


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Oct 18 
18:33:24 2007
@@ -103,7 +103,8 @@
     exchanges.declare(amq_match, HeadersExchange::typeName);
 
     if(store.get()) {
-        RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, 
+               store->init(conf.storeDir, conf.storeAsync);
+               RecoveryManagerImpl recoverer(queues, exchanges, dtxManager, 
                                       conf.stagingThreshold);
         store->recover(recoverer);
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Thu Oct 18 
18:33:24 2007
@@ -37,6 +37,16 @@
  */
 class MessageStore : public TransactionalStore, public Recoverable {
 public:
+
+    /**
+     * init the store, call before any other call. If not called, store 
+     * is free to pick any defaults
+     * 
+     * @param dir the directory to create logs/db's
+     * @param async true, enable async, false, enable sync
+        */
+       virtual void init(const std::string& dir, const bool async) = 0;
+
     /**
      * Record the existence of a durable queue
      */
@@ -133,6 +143,15 @@
      */
     virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue) = 0;
 
+    /**
+     * Flushes all async messages to disk for the specified queue
+     *
+     * Note: that this is async so the return of the function does
+     * not mean the opperation is complete.
+     * 
+     * @param queue the name of the queue from which it is to be dequeued
+     */
+    virtual void flush(const qpid::broker::PersistableQueue& queue)=0;
 
    /**
      * Returns the number of outstanding AIO's for a given queue

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Thu 
Oct 18 18:33:24 2007
@@ -28,6 +28,11 @@
 {
 }
 
+void MessageStoreModule::init(const std::string& dir, const bool async)
+{
+       store->init(dir, async);
+}
+
 void MessageStoreModule::create(PersistableQueue& queue)
 {
     store->create(queue);
@@ -93,6 +98,11 @@
 void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& 
msg, const PersistableQueue& queue)
 {
     store->dequeue(ctxt, msg, queue);
+}
+
+void MessageStoreModule::flush(const qpid::broker::PersistableQueue& queue)
+{
+    store->flush(queue);
 }
 
 u_int32_t MessageStoreModule::outstandingQueueAIO(const PersistableQueue& 
queue)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Thu Oct 
18 18:33:24 2007
@@ -38,6 +38,7 @@
 public:
     MessageStoreModule(const std::string& name);
 
+       void init(const std::string& dir, const bool async);
     std::auto_ptr<TransactionContext> begin();
     std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     void prepare(TPCTransactionContext& txn);
@@ -62,7 +63,8 @@
     void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const 
PersistableQueue& queue);
     void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const 
PersistableQueue& queue);
     u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
-
+    void flush(const qpid::broker::PersistableQueue& queue);
+        
     ~MessageStoreModule(){}
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Oct 
18 18:33:24 2007
@@ -49,6 +49,11 @@
 
 NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
 
+void NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/)
+{
+    QPID_LOG(info, "Can't init, store not enabled");
+}
+
 void NullMessageStore::create(PersistableQueue& queue)
 {
     QPID_LOG(info, "Can't create durable queue '" << queue.getName() << "'. 
Persistence not enabled.");
@@ -107,6 +112,11 @@
 {
     msg.dequeueComplete();
     QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. 
Persistence not enabled.");
+}
+
+void NullMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+{
+    QPID_LOG(info, "Can't flush. Persistence not enabled queue-" << 
queue.getName());
 }
 
 u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=586207&r1=586206&r2=586207&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Oct 18 
18:33:24 2007
@@ -38,6 +38,7 @@
 public:
     NullMessageStore(bool warn = false);
 
+       virtual void init(const std::string& dir, const bool async);
     virtual std::auto_ptr<TransactionContext> begin();
     virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
     virtual void prepare(TPCTransactionContext& txn);
@@ -62,6 +63,7 @@
     virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue);
     virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue);
     virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
+       virtual void flush(const qpid::broker::PersistableQueue& queue);
     ~NullMessageStore(){}
 };
 


Reply via email to