Author: gsim
Date: Wed Jun  4 08:46:00 2008
New Revision: 663243

URL: http://svn.apache.org/viewvc?rev=663243&view=rev
Log:
Change to lazy-loading to avoid relying on the content-size to be set by client.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=663243&r1=663242&r2=663243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Wed Jun  4 
08:46:00 2008
@@ -41,13 +41,6 @@
 
 Message::~Message()
 {
-    if (staged) {
-        if (store) {
-            store->destroy(*this);
-        } else {
-            QPID_LOG(error, "Message content was staged but no store is set so 
it can't be destroyed");
-        }
-    }
 }
 
 std::string Message::getRoutingKey() const
@@ -178,32 +171,43 @@
     }
 }
 
+void Message::destroy()
+{
+    if (staged) {
+        if (store) {
+            store->destroy(*this);
+        } else {
+            QPID_LOG(error, "Message content was staged but no store is set so 
it can't be destroyed");
+        }
+    }
+}
+
 void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t 
maxFrameSize) const
 {
     if (isContentReleased()) {
         //load content from store in chunks of maxContentSize
         uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
-        uint64_t expectedSize(frames.getHeaders()->getContentLength());
         intrusive_ptr<const PersistableMessage> pmsg(this);
-        for (uint64_t offset = 0; offset < expectedSize; offset += 
maxContentSize)
+        
+        bool done = false;
+        for (uint64_t offset = 0; !done; offset += maxContentSize)
         {            
-            uint64_t remaining = expectedSize - offset;
             AMQFrame frame(in_place<AMQContentBody>());
             string& data = frame.castBody<AMQContentBody>()->getData();
 
-            store->loadContent(queue, pmsg, data, offset,
-                               remaining > maxContentSize ? maxContentSize : 
remaining);
+            store->loadContent(queue, pmsg, data, offset, maxContentSize);
+            done = data.size() < maxContentSize;
             frame.setBof(false);
             frame.setEof(true);
             if (offset > 0) {
                 frame.setBos(false);
             }
-            if (remaining > maxContentSize) {
+            if (!done) {
                 frame.setEos(false);
             }
+            QPID_LOG(debug, "loaded frame for delivery: " << frame);
             out.handle(frame);
         }
-
     } else {
         Count c;
         frames.map_if(c, TypeFilter<CONTENT_BODY>());

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=663243&r1=663242&r2=663243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Wed Jun  4 08:46:00 
2008
@@ -119,6 +119,7 @@
      * be reloaded.
      */
     void releaseContent(MessageStore* store);
+    void destroy();
 
     void sendContent(Queue& queue, framing::FrameHandler& out, uint16_t 
maxFrameSize) const;
     void sendHeader(framing::FrameHandler& out, uint16_t maxFrameSize) const;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=663243&r1=663242&r2=663243&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jun  4 
08:46:00 2008
@@ -370,6 +370,9 @@
         if (cacheExchange->getAlternate()) {
             cacheExchange->getAlternate()->route(strategy, 
msg->getRoutingKey(), msg->getApplicationHeaders());
         }
+        if (!strategy.delivered) {
+            msg->destroy();
+        }
     }
 
 }


Reply via email to