Author: gsim
Date: Thu Nov 30 06:48:24 2006
New Revision: 480946
URL: http://svn.apache.org/viewvc?view=rev&rev=480946
Log:
Some further tweaks to MessageStore interface.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp Thu Nov
30 06:48:24 2006
@@ -23,12 +23,12 @@
using namespace qpid::broker;
using namespace qpid::framing;
-LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t
_msgId, u_int64_t _expectedSize) :
- store(_store), msgId(_msgId), expectedSize(_expectedSize) {}
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message*
const _msg, u_int64_t _expectedSize) :
+ store(_store), msg(_msg), expectedSize(_expectedSize) {}
void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
{
- store->appendContent(msgId, data->getData());
+ store->appendContent(msg, data->getData());
}
u_int32_t LazyLoadedContent::size()
@@ -42,12 +42,12 @@
for (u_int64_t offset = 0; offset < expectedSize; offset += framesize)
{
u_int64_t remaining = expectedSize - offset;
string data;
- store->loadContent(msgId, data, offset, remaining > framesize ?
framesize : remaining);
+ store->loadContent(msg, data, offset, remaining > framesize ?
framesize : remaining);
out->send(new AMQFrame(channel, new AMQContentBody(data)));
}
} else {
string data;
- store->loadContent(msgId, data, 0, expectedSize);
+ store->loadContent(msg, data, 0, expectedSize);
out->send(new AMQFrame(channel, new AMQContentBody(data)));
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h Thu Nov
30 06:48:24 2006
@@ -28,10 +28,10 @@
namespace broker {
class LazyLoadedContent : public Content{
MessageStore* const store;
- const u_int64_t msgId;
+ Message* const msg;
const u_int64_t expectedSize;
public:
- LazyLoadedContent(MessageStore* const store, u_int64_t msgId,
u_int64_t expectedSize);
+ LazyLoadedContent(MessageStore* const store, Message* const msg,
u_int64_t expectedSize);
void add(qpid::framing::AMQContentBody::shared_ptr data);
u_int32_t size();
void send(qpid::framing::OutputHandler* out, int channel,
u_int32_t framesize);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov 30
06:48:24 2006
@@ -201,7 +201,11 @@
{
if (!content.get() || content->size() > 0) {
//set content to lazy loading mode (but only if there is stored
content):
- content = std::auto_ptr<Content>(new LazyLoadedContent(store,
getPersistenceId(), expectedContentSize()));
+
+ //Note: the LazyLoadedContent instance contains a raw pointer to the
message, however it is
+ // then set as a member of that message so its lifetime is
guaranteed to be no longer than
+ // that of the message itself
+ content = std::auto_ptr<Content>(new LazyLoadedContent(store, this,
expectedContentSize()));
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Thu Nov 30
06:48:24 2006
@@ -54,8 +54,7 @@
message->setHeader(header);
if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
store->stage(message);
- auto_ptr<Content> content(new LazyLoadedContent(store,
message->getPersistenceId(), message->expectedContentSize()));
- message->setContent(content);
+ message->releaseContent(store);
} else {
auto_ptr<Content> content(new InMemoryContent());
message->setContent(content);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Thu Nov 30
06:48:24 2006
@@ -27,6 +27,16 @@
namespace qpid {
namespace broker {
+ struct MessageStoreSettings
+ {
+ /**
+ * Messages whose content length is larger than this value
+ * will be staged (i.e. will have thier data written to
+ * disk as it arrives) and will load their data lazily. On
+ * recovery therefore, only the headers should be loaded.
+ */
+ u_int64_t stagingThreshold;
+ };
/**
* An abstraction of the persistent storage for messages.
*/
@@ -44,7 +54,7 @@
/**
* Request recovery of queue and message state from store
*/
- virtual void recover(RecoveryManager& queues) = 0;
+ virtual void recover(RecoveryManager& queues, const
MessageStoreSettings* const settings = 0) = 0;
/**
* Stores a messages before it has been enqueued
@@ -68,17 +78,17 @@
/**
* Appends content to a previously staged message
*/
- virtual void appendContent(u_int64_t msgId, const std::string&
data) = 0;
+ virtual void appendContent(Message* const msg, const std::string&
data) = 0;
/**
* Loads (a section) of content data for the specified
- * message id (previously set on the message through a
- * call to stage or enqueue) into data. The offset refers
- * to the content only (i.e. an offset of 0 implies that
- * the start of the content should be loaded, not the
- * headers or related meta-data).
+ * message (previously stored through a call to stage or
+ * enqueue) into data. The offset refers to the content
+ * only (i.e. an offset of 0 implies that the start of the
+ * content should be loaded, not the headers or related
+ * meta-data).
*/
- virtual void loadContent(u_int64_t msgId, std::string& data,
u_int64_t offset, u_int32_t length) = 0;
+ virtual void loadContent(Message* const msg, std::string& data,
u_int64_t offset, u_int32_t length) = 0;
/**
* Enqueues a message, storing the message if it has not
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Thu
Nov 30 06:48:24 2006
@@ -38,9 +38,9 @@
store->destroy(queue);
}
-void MessageStoreModule::recover(RecoveryManager& registry)
+void MessageStoreModule::recover(RecoveryManager& registry, const
MessageStoreSettings* const settings)
{
- store->recover(registry);
+ store->recover(registry, settings);
}
void MessageStoreModule::stage(Message::shared_ptr& msg)
@@ -53,14 +53,14 @@
store->destroy(msg);
}
-void MessageStoreModule::appendContent(u_int64_t msgId, const std::string&
data)
+void MessageStoreModule::appendContent(Message* const msg, const std::string&
data)
{
- store->appendContent(msgId, data);
+ store->appendContent(msg, data);
}
-void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t
offset, u_int32_t length)
+void MessageStoreModule::loadContent(Message* const msg, string& data,
u_int64_t offset, u_int32_t length)
{
- store->loadContent(msgId, data, offset, length);
+ store->loadContent(msg, data, offset, length);
}
void MessageStoreModule::enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Thu Nov
30 06:48:24 2006
@@ -38,11 +38,11 @@
MessageStoreModule(const std::string& name);
void create(const Queue& queue);
void destroy(const Queue& queue);
- void recover(RecoveryManager& queues);
+ void recover(RecoveryManager& queues, const MessageStoreSettings*
const settings = 0);
void stage(Message::shared_ptr& msg);
void destroy(Message::shared_ptr& msg);
- void appendContent(u_int64_t msgId, const std::string& data);
- void loadContent(u_int64_t msgId, std::string& data, u_int64_t
offset, u_int32_t length);
+ void appendContent(Message* const msg, const std::string& data);
+ void loadContent(Message* const msg, std::string& data, u_int64_t
offset, u_int32_t length);
void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
void committed(const string * const xid);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Thu Nov
30 06:48:24 2006
@@ -40,7 +40,7 @@
if (warn) std::cout << "WARNING: Can't destroy durable queue '" <<
queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::recover(RecoveryManager&)
+void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings*
const)
{
if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of
queues or messages." << std::endl;
}
@@ -55,12 +55,12 @@
if (warn) std::cout << "WARNING: No need to destroy staged message.
Persistence not enabled." << std::endl;
}
-void NullMessageStore::appendContent(u_int64_t, const string&)
+void NullMessageStore::appendContent(Message* const, const string&)
{
if (warn) std::cout << "WARNING: Can't append content. Persistence not
enabled." << std::endl;
}
-void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t)
+void NullMessageStore::loadContent(Message* const, string&, u_int64_t,
u_int32_t)
{
if (warn) std::cout << "WARNING: Can't load content. Persistence not
enabled." << std::endl;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Thu Nov 30
06:48:24 2006
@@ -37,11 +37,11 @@
NullMessageStore(bool warn = true);
virtual void create(const Queue& queue);
virtual void destroy(const Queue& queue);
- virtual void recover(RecoveryManager& queues);
+ virtual void recover(RecoveryManager& queues, const
MessageStoreSettings* const settings = 0);
virtual void stage(Message::shared_ptr& msg);
virtual void destroy(Message::shared_ptr& msg);
- virtual void appendContent(u_int64_t msgId, const std::string&
data);
- virtual void loadContent(u_int64_t msgId, std::string& data,
u_int64_t offset, u_int32_t length);
+ virtual void appendContent(Message* const msg, const std::string&
data);
+ virtual void loadContent(Message* const msg, std::string& data,
u_int64_t offset, u_int32_t length);
virtual void enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
virtual void dequeue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
virtual void committed(const string * const xid);
Modified:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp?view=diff&rev=480946&r1=480945&r2=480946
==============================================================================
---
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
(original)
+++
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
Thu Nov 30 06:48:24 2006
@@ -55,7 +55,7 @@
public:
TestMessageStore(const string& _content) : content(_content) {}
- void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t
length)
+ void loadContent(Message* const, string& data, u_int64_t offset,
u_int32_t length)
{
if (offset + length <= content.size()) {
data = content.substr(offset, length);
@@ -96,7 +96,7 @@
void load(string& in, size_t outCount, string* out, u_int32_t framesize)
{
TestMessageStore store(in);
- LazyLoadedContent content(&store, 1, in.size());
+ LazyLoadedContent content(&store, 0, in.size());
DummyHandler handler;
u_int16_t channel = 3;
content.send(&handler, channel, framesize);