Author: gsim
Date: Wed Jun 4 08:46:00 2008
New Revision: 663243
URL: http://svn.apache.org/viewvc?rev=663243&view=rev
Log:
Change to lazy-loading to avoid relying on the content-size to be set by client.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=663243&r1=663242&r2=663243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Wed Jun 4
08:46:00 2008
@@ -41,13 +41,6 @@
Message::~Message()
{
- if (staged) {
- if (store) {
- store->destroy(*this);
- } else {
- QPID_LOG(error, "Message content was staged but no store is set so
it can't be destroyed");
- }
- }
}
std::string Message::getRoutingKey() const
@@ -178,32 +171,43 @@
}
}
+void Message::destroy()
+{
+ if (staged) {
+ if (store) {
+ store->destroy(*this);
+ } else {
+ QPID_LOG(error, "Message content was staged but no store is set so
it can't be destroyed");
+ }
+ }
+}
+
void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t
maxFrameSize) const
{
if (isContentReleased()) {
//load content from store in chunks of maxContentSize
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
- uint64_t expectedSize(frames.getHeaders()->getContentLength());
intrusive_ptr<const PersistableMessage> pmsg(this);
- for (uint64_t offset = 0; offset < expectedSize; offset +=
maxContentSize)
+
+ bool done = false;
+ for (uint64_t offset = 0; !done; offset += maxContentSize)
{
- uint64_t remaining = expectedSize - offset;
AMQFrame frame(in_place<AMQContentBody>());
string& data = frame.castBody<AMQContentBody>()->getData();
- store->loadContent(queue, pmsg, data, offset,
- remaining > maxContentSize ? maxContentSize :
remaining);
+ store->loadContent(queue, pmsg, data, offset, maxContentSize);
+ done = data.size() < maxContentSize;
frame.setBof(false);
frame.setEof(true);
if (offset > 0) {
frame.setBos(false);
}
- if (remaining > maxContentSize) {
+ if (!done) {
frame.setEos(false);
}
+ QPID_LOG(debug, "loaded frame for delivery: " << frame);
out.handle(frame);
}
-
} else {
Count c;
frames.map_if(c, TypeFilter<CONTENT_BODY>());
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=663243&r1=663242&r2=663243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed Jun 4 08:46:00
2008
@@ -119,6 +119,7 @@
* be reloaded.
*/
void releaseContent(MessageStore* store);
+ void destroy();
void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t
maxFrameSize) const;
void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=663243&r1=663242&r2=663243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jun 4
08:46:00 2008
@@ -370,6 +370,9 @@
if (cacheExchange->getAlternate()) {
cacheExchange->getAlternate()->route(strategy,
msg->getRoutingKey(), msg->getApplicationHeaders());
}
+ if (!strategy.delivered) {
+ msg->destroy();
+ }
}
}