Author: gsim
Date: Thu Nov 30 06:48:24 2006
New Revision: 480946

URL: http://svn.apache.org/viewvc?view=rev&rev=480946
Log:
Some further tweaks to MessageStore interface.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp Thu Nov 
30 06:48:24 2006
@@ -23,12 +23,12 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t 
_msgId, u_int64_t _expectedSize) : 
-    store(_store), msgId(_msgId), expectedSize(_expectedSize) {}
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* 
const _msg, u_int64_t _expectedSize) : 
+    store(_store), msg(_msg), expectedSize(_expectedSize) {}
 
 void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
 {
-    store->appendContent(msgId, data->getData());
+    store->appendContent(msg, data->getData());
 }
 
 u_int32_t LazyLoadedContent::size()
@@ -42,12 +42,12 @@
         for (u_int64_t offset = 0; offset < expectedSize; offset += framesize) 
{            
             u_int64_t remaining = expectedSize - offset;
             string data;
-            store->loadContent(msgId, data, offset, remaining > framesize ? 
framesize : remaining);              
+            store->loadContent(msg, data, offset, remaining > framesize ? 
framesize : remaining);              
             out->send(new AMQFrame(channel, new AMQContentBody(data)));
         }
     } else {
         string data;
-        store->loadContent(msgId, data, 0, expectedSize);  
+        store->loadContent(msg, data, 0, expectedSize);  
         out->send(new AMQFrame(channel, new AMQContentBody(data)));
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h Thu Nov 
30 06:48:24 2006
@@ -28,10 +28,10 @@
     namespace broker {
         class LazyLoadedContent : public Content{
             MessageStore* const store;
-            const u_int64_t msgId;
+            Message* const msg;
             const u_int64_t expectedSize;
         public:
-            LazyLoadedContent(MessageStore* const store, u_int64_t msgId, 
u_int64_t expectedSize);
+            LazyLoadedContent(MessageStore* const store, Message* const msg, 
u_int64_t expectedSize);
             void add(qpid::framing::AMQContentBody::shared_ptr data);
             u_int32_t size();
             void send(qpid::framing::OutputHandler* out, int channel, 
u_int32_t framesize);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov 30 
06:48:24 2006
@@ -201,7 +201,11 @@
 {
     if (!content.get() || content->size() > 0) {
         //set content to lazy loading mode (but only if there is stored 
content):
-        content = std::auto_ptr<Content>(new LazyLoadedContent(store, 
getPersistenceId(), expectedContentSize()));
+
+        //Note: the LazyLoadedContent instance contains a raw pointer to the 
message, however it is
+        //      then set as a member of that message so its lifetime is 
guaranteed to be no longer than
+        //      that of the message itself
+        content = std::auto_ptr<Content>(new LazyLoadedContent(store, this, 
expectedContentSize()));
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Thu Nov 30 
06:48:24 2006
@@ -54,8 +54,7 @@
     message->setHeader(header);
     if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
         store->stage(message);
-        auto_ptr<Content> content(new LazyLoadedContent(store, 
message->getPersistenceId(), message->expectedContentSize()));
-        message->setContent(content);
+        message->releaseContent(store);
     } else {
         auto_ptr<Content> content(new InMemoryContent());
         message->setContent(content);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Thu Nov 30 
06:48:24 2006
@@ -27,6 +27,16 @@
 
 namespace qpid {
     namespace broker {
+        struct MessageStoreSettings
+        {
+            /**
+             * Messages whose content length is larger than this value
+             * will be staged (i.e. will have thier data written to
+             * disk as it arrives) and will load their data lazily. On
+             * recovery therefore, only the headers should be loaded.
+             */
+            u_int64_t stagingThreshold;
+        };
         /**
          * An abstraction of the persistent storage for messages.
          */
@@ -44,7 +54,7 @@
             /**
              * Request recovery of queue and message state from store
              */
-            virtual void recover(RecoveryManager& queues) = 0;
+            virtual void recover(RecoveryManager& queues, const 
MessageStoreSettings* const settings = 0) = 0;
 
             /**
              * Stores a messages before it has been enqueued
@@ -68,17 +78,17 @@
             /**
              * Appends content to a previously staged message
              */
-            virtual void appendContent(u_int64_t msgId, const std::string& 
data) = 0;
+            virtual void appendContent(Message* const msg, const std::string& 
data) = 0;
 
             /**
              * Loads (a section) of content data for the specified
-             * message id (previously set on the message through a
-             * call to stage or enqueue) into data. The offset refers
-             * to the content only (i.e. an offset of 0 implies that
-             * the start of the content should be loaded, not the
-             * headers or related meta-data).
+             * message (previously stored through a call to stage or
+             * enqueue) into data. The offset refers to the content
+             * only (i.e. an offset of 0 implies that the start of the
+             * content should be loaded, not the headers or related
+             * meta-data).
              */
-            virtual void loadContent(u_int64_t msgId, std::string& data, 
u_int64_t offset, u_int32_t length) = 0;
+            virtual void loadContent(Message* const msg, std::string& data, 
u_int64_t offset, u_int32_t length) = 0;
 
             /**
              * Enqueues a message, storing the message if it has not

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Thu 
Nov 30 06:48:24 2006
@@ -38,9 +38,9 @@
     store->destroy(queue);
 }
 
-void MessageStoreModule::recover(RecoveryManager& registry)
+void MessageStoreModule::recover(RecoveryManager& registry, const 
MessageStoreSettings* const settings)
 {
-    store->recover(registry);
+    store->recover(registry, settings);
 }
 
 void MessageStoreModule::stage(Message::shared_ptr& msg)
@@ -53,14 +53,14 @@
     store->destroy(msg);
 }
 
-void MessageStoreModule::appendContent(u_int64_t msgId, const std::string& 
data)
+void MessageStoreModule::appendContent(Message* const msg, const std::string& 
data)
 {
-    store->appendContent(msgId, data);
+    store->appendContent(msg, data);
 }
 
-void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t 
offset, u_int32_t length)
+void MessageStoreModule::loadContent(Message* const msg, string& data, 
u_int64_t offset, u_int32_t length)
 {
-    store->loadContent(msgId, data, offset, length);
+    store->loadContent(msg, data, offset, length);
 }
 
 void MessageStoreModule::enqueue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Thu Nov 
30 06:48:24 2006
@@ -38,11 +38,11 @@
             MessageStoreModule(const std::string& name);
             void create(const Queue& queue);
             void destroy(const Queue& queue);
-            void recover(RecoveryManager& queues);
+            void recover(RecoveryManager& queues, const MessageStoreSettings* 
const settings = 0);
             void stage(Message::shared_ptr& msg);
             void destroy(Message::shared_ptr& msg);
-            void appendContent(u_int64_t msgId, const std::string& data);
-            void loadContent(u_int64_t msgId, std::string& data, u_int64_t 
offset, u_int32_t length);
+            void appendContent(Message* const msg, const std::string& data);
+            void loadContent(Message* const msg, std::string& data, u_int64_t 
offset, u_int32_t length);
             void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
             void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
             void committed(const string * const xid);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Nov 
30 06:48:24 2006
@@ -40,7 +40,7 @@
     if (warn) std::cout << "WARNING: Can't destroy durable queue '" << 
queue.getName() << "'. Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::recover(RecoveryManager&)
+void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* 
const)
 {
     if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of 
queues or messages." << std::endl;
 }
@@ -55,12 +55,12 @@
     if (warn) std::cout << "WARNING: No need to destroy staged message. 
Persistence not enabled." << std::endl;
 }
 
-void NullMessageStore::appendContent(u_int64_t, const string&)
+void NullMessageStore::appendContent(Message* const, const string&)
 {
     if (warn) std::cout << "WARNING: Can't append content. Persistence not 
enabled." << std::endl;
 }
 
-void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t)
+void NullMessageStore::loadContent(Message* const, string&, u_int64_t, 
u_int32_t)
 {
     if (warn) std::cout << "WARNING: Can't load content. Persistence not 
enabled." << std::endl;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Nov 30 
06:48:24 2006
@@ -37,11 +37,11 @@
             NullMessageStore(bool warn = true);
             virtual void create(const Queue& queue);
             virtual void destroy(const Queue& queue);
-            virtual void recover(RecoveryManager& queues);
+            virtual void recover(RecoveryManager& queues, const 
MessageStoreSettings* const settings = 0);
             virtual void stage(Message::shared_ptr& msg);
             virtual void destroy(Message::shared_ptr& msg);
-            virtual void appendContent(u_int64_t msgId, const std::string& 
data);
-            virtual void loadContent(u_int64_t msgId, std::string& data, 
u_int64_t offset, u_int32_t length);
+            virtual void appendContent(Message* const msg, const std::string& 
data);
+            virtual void loadContent(Message* const msg, std::string& data, 
u_int64_t offset, u_int32_t length);
             virtual void enqueue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
             virtual void dequeue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
             virtual void committed(const string * const xid);

Modified: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp 
(original)
+++ 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp 
Thu Nov 30 06:48:24 2006
@@ -55,7 +55,7 @@
     public:
         TestMessageStore(const string& _content) : content(_content) {}
 
-        void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t 
length)
+        void loadContent(Message* const, string& data, u_int64_t offset, 
u_int32_t length)
         {
             if (offset + length <= content.size()) {
                 data = content.substr(offset, length);
@@ -96,7 +96,7 @@
     void load(string& in, size_t outCount, string* out, u_int32_t framesize)
     {
         TestMessageStore store(in);
-        LazyLoadedContent content(&store, 1, in.size());
+        LazyLoadedContent content(&store, 0, in.size());
         DummyHandler handler;
         u_int16_t channel = 3;
         content.send(&handler, channel, framesize);         


Reply via email to