Author: gsim
Date: Fri Nov 24 09:21:47 2006
New Revision: 478923

URL: http://svn.apache.org/viewvc?view=rev&rev=478923
Log:
Initial sketching out of staging functionality for large messages (i.e. 
allowing content to be stored as it arrives, rather than collecting it in 
memory until complete).


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Channel.cpp Fri Nov 24 
09:21:47 2006
@@ -167,6 +167,8 @@
 
 void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
     messageBuilder.setHeader(header);
+    //at this point, decide based on the size of the message whether we want
+    //to stage it by saving content directly to disk as it arrives
 }
 
 void Channel::handleContent(AMQContentBody::shared_ptr content){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Fri Nov 24 
09:21:47 2006
@@ -41,23 +41,11 @@
                                                      persistenceId(0) {}
 
 Message::Message(Buffer& buffer) : publisher(0), mandatory(false), 
immediate(false), redelivered(false), size(0), persistenceId(0){
-    buffer.getShortString(exchange);
-    buffer.getShortString(routingKey);
-    
-    AMQFrame headerFrame;
-    headerFrame.decode(buffer);
-    AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody, 
AMQBody>(headerFrame.getBody());
-    setHeader(headerBody);
-    
-    AMQContentBody::shared_ptr contentBody;
-    while (buffer.available()) {
-        AMQFrame contentFrame;
-        contentFrame.decode(buffer);
-        contentBody = dynamic_pointer_cast<AMQContentBody, 
AMQBody>(contentFrame.getBody());
-        addContent(contentBody);
-    }        
+    decode(buffer);
 }
 
+Message::Message() : publisher(0), mandatory(false), immediate(false), 
redelivered(false), size(0), persistenceId(0){}
+
 Message::~Message(){}
 
 void Message::setHeader(AMQHeaderBody::shared_ptr _header){
@@ -83,7 +71,6 @@
        // AMQP version change - kpvdr 2006-11-17
        // TODO: Make this class version-aware and link these hard-wired 
numbers to that version
     out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0), 
consumerTag, deliveryTag, redelivered, exchange, routingKey)));
-//    out->send(new AMQFrame(channel, new 
BasicDeliverBody(ProtocolVersion(8,0), consumerTag, deliveryTag, redelivered, 
exchange, routingKey)));
     sendContent(out, channel, framesize);
 }
 
@@ -128,18 +115,54 @@
     return props && props->getDeliveryMode() == PERSISTENT;
 }
 
-void Message::encode(Buffer& buffer)
+void Message::decode(Buffer& buffer)
 {
-    buffer.putShortString(exchange);
-    buffer.putShortString(routingKey);
+    decodeHeader(buffer);
+    decodeContent(buffer);
+}
+
+void Message::decodeHeader(Buffer& buffer)
+{
+    buffer.getShortString(exchange);
+    buffer.getShortString(routingKey);
     
-    AMQBody::shared_ptr body;
+    u_int32_t headerSize = buffer.getLong();
+    AMQHeaderBody::shared_ptr headerBody(new AMQHeaderBody());
+    headerBody->decode(buffer, headerSize);
+    setHeader(headerBody);
+}
 
-    body = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+void Message::decodeContent(Buffer& buffer)
+{    
+    AMQContentBody::shared_ptr contentBody;
+    while (buffer.available()) {
+        AMQFrame contentFrame;
+        contentFrame.decode(buffer);
+        contentBody = dynamic_pointer_cast<AMQContentBody, 
AMQBody>(contentFrame.getBody());
+        addContent(contentBody);
+    }        
+}
 
-    AMQFrame headerFrame(0, body);
-    headerFrame.encode(buffer);
-    
+void Message::encode(Buffer& buffer)
+{
+    encodeHeader(buffer);
+    encodeContent(buffer);
+}
+
+void Message::encodeHeader(Buffer& buffer)
+{
+    buffer.putShortString(exchange);
+    buffer.putShortString(routingKey);    
+    buffer.putLong(header->size());
+    header->encode(buffer);
+}
+
+void Message::encodeContent(Buffer& buffer)
+{
+    //Use a frame around each content block. Not really required but
+    //gives some error checking at little expense. Could change in the
+    //future...
+    AMQBody::shared_ptr body;
     for (content_iterator i = content.begin(); i != content.end(); i++) {
         body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
         AMQFrame contentFrame(0, body);
@@ -149,13 +172,31 @@
 
 u_int32_t Message::encodedSize()
 {
+    return  encodedHeaderSize() + encodedContentSize();
+}
+
+u_int32_t Message::encodedContentSize()
+{
     int encodedContentSize(0);
     for (content_iterator i = content.begin(); i != content.end(); i++) {
-        encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame 
(TODO, could replace frame by simple size)
+        encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
     }
+    return encodedContentSize;
+}
 
+u_int32_t Message::encodedHeaderSize()
+{
     return exchange.size() + 1
         + routingKey.size() + 1
-        + header->size() + 8 //8 extra bytes for frame (TODO, could actually 
remove the frame)
-        + encodedContentSize;
+        + header->size() + 4;//4 extra bytes for size
+}
+
+u_int64_t Message::expectedContentSize()
+{
+    return header.get() ? header->getContentSize() : 0;
+}
+
+void Message::releaseContent()
+{
+    content.clear();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Fri Nov 24 09:21:47 
2006
@@ -63,6 +63,7 @@
                     const string& exchange, const string& routingKey, 
                     bool mandatory, bool immediate);
             Message(qpid::framing::Buffer& buffer);
+            Message();
             ~Message();
             void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
             void addContent(qpid::framing::AMQContentBody::shared_ptr data);
@@ -88,12 +89,39 @@
             u_int64_t contentSize() const { return size; }
             u_int64_t getPersistenceId() const { return persistenceId; }
             void setPersistenceId(u_int64_t _persistenceId) { persistenceId = 
_persistenceId; }
+
+            void decode(qpid::framing::Buffer& buffer);
+            void decodeHeader(qpid::framing::Buffer& buffer);
+            void decodeContent(qpid::framing::Buffer& buffer);
+
             void encode(qpid::framing::Buffer& buffer);
+            void encodeHeader(qpid::framing::Buffer& buffer);
+            void encodeContent(qpid::framing::Buffer& buffer);
             /**
-             * @returns the size of the buffer needed to encode this message
+             * @returns the size of the buffer needed to encode this
+             * message in its entirety
              */
             u_int32_t encodedSize();
-            
+            /**
+             * @returns the size of the buffer needed to encode the
+             * 'header' of this message (not just the header frame,
+             * but other meta data e.g.routing key and exchange)
+             */
+            u_int32_t encodedHeaderSize();
+            /**
+             * @returns the size of the buffer needed to encode the
+             * (possibly partial) content held by this message
+             */
+            u_int32_t encodedContentSize();
+            /**
+             * Releases the in-memory content data held by this message.
+             */
+            void releaseContent();
+            /**
+             * If headers have been received, returns the expected
+             * content size else returns 0.
+             */
+            u_int64_t expectedContentSize();
         };
 
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Fri Nov 24 
09:21:47 2006
@@ -23,12 +23,22 @@
 using namespace qpid::broker;
 using namespace qpid::framing;
 
-MessageBuilder::MessageBuilder(CompletionHandler* _handler) : 
handler(_handler) {}
+MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore* 
const _store, u_int64_t _stagingThreshold) : 
+    handler(_handler),
+    store(_store),
+    stagingThreshold(_stagingThreshold),
+    staging(false)
+{}
 
 void MessageBuilder::route(){
-    if(message->isComplete()){
-        if(handler) handler->complete(message);
+    if (staging && store) {
+        store->stage(message);
+        message->releaseContent();
+    }
+    if (message->isComplete()) {
+        if (handler) handler->complete(message);
         message.reset();
+        staging = false;
     }
 }
 
@@ -44,6 +54,7 @@
         THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got 
header before publish.");
     }
     message->setHeader(header);
+    staging = stagingThreshold && header->getContentSize() >= stagingThreshold;
     route();
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Fri Nov 24 
09:21:47 2006
@@ -24,6 +24,7 @@
 #include <qpid/QpidError.h>
 #include <qpid/broker/Exchange.h>
 #include <qpid/broker/Message.h>
+#include <qpid/broker/MessageStore.h>
 #include <qpid/framing/AMQContentBody.h>
 #include <qpid/framing/AMQHeaderBody.h>
 #include <qpid/framing/BasicPublishBody.h>
@@ -37,13 +38,16 @@
                 virtual void complete(Message::shared_ptr&) = 0;
                 virtual ~CompletionHandler(){}
             };
-            MessageBuilder(CompletionHandler* _handler);
+            MessageBuilder(CompletionHandler* _handler, MessageStore* const 
store = 0, u_int64_t stagingThreshold = 0);
             void initialise(Message::shared_ptr& msg);
             void setHeader(qpid::framing::AMQHeaderBody::shared_ptr& header);
             void addContent(qpid::framing::AMQContentBody::shared_ptr& 
content);
         private:
             Message::shared_ptr message;
             CompletionHandler* handler;
+            MessageStore* const store;
+            const u_int64_t stagingThreshold;
+            bool staging;
 
             void route();
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Fri Nov 24 
09:21:47 2006
@@ -47,6 +47,25 @@
             virtual void recover(RecoveryManager& queues) = 0;
 
             /**
+             * Stores a messages before it has been enqueued
+             * (enqueueing automatically stores the message so this is
+             * only required if storage is required prior to that
+             * point). If the message has not yet been stored it will
+             * store the headers and any available content. If the
+             * message has already been stored it will append any
+             * currently held content.
+             */
+            virtual void stage(Message::shared_ptr& msg) = 0;
+            
+            /**
+             * Destroys a previously staged message. This only needs
+             * to be called if the message is never enqueued. (Once
+             * enqueued, deletion will be automatic when the message
+             * is dequeued from all queues it was enqueued onto).
+             */
+            virtual void destroy(Message::shared_ptr& msg) = 0;
+
+            /**
              * Enqueues a message, storing the message if it has not
              * been previously stored and recording that the given
              * message is on the given queue.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Fri 
Nov 24 09:21:47 2006
@@ -43,6 +43,16 @@
     store->recover(registry);
 }
 
+void MessageStoreModule::stage(Message::shared_ptr& msg)
+{
+    store->stage(msg);
+}
+
+void MessageStoreModule::destroy(Message::shared_ptr& msg)
+{
+    store->destroy(msg);
+}
+
 void MessageStoreModule::enqueue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid)
 {
     store->enqueue(ctxt, msg, queue, xid);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Fri Nov 
24 09:21:47 2006
@@ -39,6 +39,8 @@
             void create(const Queue& queue);
             void destroy(const Queue& queue);
             void recover(RecoveryManager& queues);
+            void stage(Message::shared_ptr& msg);
+            void destroy(Message::shared_ptr& msg);
             void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
             void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
             void committed(const string * const xid);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Fri Nov 
24 09:21:47 2006
@@ -28,31 +28,51 @@
 
 using namespace qpid::broker;
 
-void NullMessageStore::create(const Queue& queue){
-    std::cout << "WARNING: Can't create durable queue '" << queue.getName() << 
"'. Persistence not enabled." << std::endl;
+NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
+
+void NullMessageStore::create(const Queue& queue)
+{
+    if (warn) std::cout << "WARNING: Can't create durable queue '" << 
queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::destroy(const Queue& queue)
+{
+    if (warn) std::cout << "WARNING: Can't destroy durable queue '" << 
queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::recover(RecoveryManager&)
+{
+    if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of 
queues or messages." << std::endl;
+}
+void NullMessageStore::stage(Message::shared_ptr&)
+{
+    if (warn) std::cout << "WARNING: Can't stage message. Persistence not 
enabled." << std::endl;
+}
+void NullMessageStore::destroy(Message::shared_ptr&)
+{
+    if (warn) std::cout << "WARNING: No need to destroy staged message. 
Persistence not enabled." << std::endl;
+}
+void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, 
const Queue& queue, const string * const)
+{
+    if (warn) std::cout << "WARNING: Can't enqueue message onto '" << 
queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, 
const Queue& queue, const string * const)
+{
+    if (warn) std::cout << "WARNING: Can't dequeue message from '" << 
queue.getName() << "'. Persistence not enabled." << std::endl;
+}
+void NullMessageStore::committed(const string * const)
+{
+    if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
+}
+void NullMessageStore::aborted(const string * const)
+{
+    if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
 }
-void NullMessageStore::destroy(const Queue& queue){
-    std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() 
<< "'. Persistence not enabled." << std::endl;
-}
-void NullMessageStore::recover(RecoveryManager&){
-    std::cout << "WARNING: Persistence not enabled, no recovery of queues or 
messages." << std::endl;
-}
-void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&, 
const Queue& queue, const string * const){
-    std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << 
"'. Persistence not enabled." << std::endl;
-}
-void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&, 
const Queue& queue, const string * const){
-    std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << 
"'. Persistence not enabled." << std::endl;
-}
-void NullMessageStore::committed(const string * const){
-    std::cout << "WARNING: Persistence not enabled." << std::endl;
-}
-void NullMessageStore::aborted(const string * const){
-    std::cout << "WARNING: Persistence not enabled." << std::endl;
-}
-std::auto_ptr<TransactionContext> NullMessageStore::begin(){
+std::auto_ptr<TransactionContext> NullMessageStore::begin()
+{
     return std::auto_ptr<TransactionContext>();
 }
-void NullMessageStore::commit(TransactionContext*){
+void NullMessageStore::commit(TransactionContext*)
+{
 }
-void NullMessageStore::abort(TransactionContext*){
+void NullMessageStore::abort(TransactionContext*)
+{
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Fri Nov 24 
09:21:47 2006
@@ -32,17 +32,21 @@
          * A null implementation of the MessageStore interface
          */
         class NullMessageStore : public MessageStore{
+            const bool warn;
         public:
-            void create(const Queue& queue);
-            void destroy(const Queue& queue);
-            void recover(RecoveryManager& queues);
-            void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
-            void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, 
const Queue& queue, const string * const xid);
-            void committed(const string * const xid);
-            void aborted(const string * const xid);
-            std::auto_ptr<TransactionContext> begin();
-            void commit(TransactionContext* ctxt);
-            void abort(TransactionContext* ctxt);
+            NullMessageStore(bool warn = true);
+            void virtual create(const Queue& queue);
+            void virtual destroy(const Queue& queue);
+            void virtual recover(RecoveryManager& queues);
+            void virtual stage(Message::shared_ptr& msg);
+            void virtual destroy(Message::shared_ptr& msg);
+            void virtual enqueue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            void virtual dequeue(TransactionContext* ctxt, 
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+            void virtual committed(const string * const xid);
+            void virtual aborted(const string * const xid);
+            virtual std::auto_ptr<TransactionContext> begin();
+            void virtual commit(TransactionContext* ctxt);
+            void virtual abort(TransactionContext* ctxt);
             ~NullMessageStore(){}
         };
     }

Modified: 
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp 
Fri Nov 24 09:21:47 2006
@@ -18,8 +18,11 @@
  * under the License.
  *
  */
+#include <qpid/Exception.h>
 #include <qpid/broker/Message.h>
 #include <qpid/broker/MessageBuilder.h>
+#include <qpid/broker/NullMessageStore.h>
+#include <qpid/framing/Buffer.h>
 #include <qpid_test_plugin.h>
 #include <iostream>
 #include <memory>
@@ -39,11 +42,58 @@
         }
     };
 
+    class TestMessageStore : public NullMessageStore
+    {
+        Buffer* header;
+        Buffer* content;
+        const u_int32_t contentBufferSize;
+        
+    public:
+
+        void stage(Message::shared_ptr& msg)
+        {
+            if (msg->getPersistenceId() == 0) {
+                header = new Buffer(msg->encodedHeaderSize());
+                msg->encodeHeader(*header);                
+                content = new Buffer(contentBufferSize);
+                msg->encodeContent(*content);
+            } else if (!header || !content) {
+                throw qpid::Exception("Buffers not initialised!");
+            } else {
+                msg->encodeContent(*content);
+            }
+            msg->setPersistenceId(1);
+        }
+
+        Message::shared_ptr getRestoredMessage()
+        {
+            Message::shared_ptr msg(new Message());
+            if (header) {
+                header->flip();
+                msg->decodeHeader(*header);
+                delete header;
+                header = 0; 
+                if (content) {
+                    content->flip();
+                    msg->decodeContent(*content);
+                    delete content;
+                    content = 0;
+                }
+            }
+            return msg;
+        }
+        
+        //dont care about any of the other methods:
+        TestMessageStore(u_int32_t _contentBufferSize) : 
NullMessageStore(false), header(0), content(0), 
+                                                         
contentBufferSize(_contentBufferSize) {}
+        ~TestMessageStore(){}
+    };
 
     CPPUNIT_TEST_SUITE(MessageBuilderTest);
     CPPUNIT_TEST(testHeaderOnly);
     CPPUNIT_TEST(test1ContentFrame);
     CPPUNIT_TEST(test2ContentFrames);
+    CPPUNIT_TEST(testStaging);
     CPPUNIT_TEST_SUITE_END();
 
   public:
@@ -105,6 +155,40 @@
         builder.addContent(part2);
         CPPUNIT_ASSERT(handler.msg);
         CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+    }
+
+    void testStaging(){
+        DummyHandler handler;
+        TestMessageStore store(50);//more than enough for two frames of 14 
bytes
+        MessageBuilder builder(&handler, &store, 5);
+
+        string data1("abcdefg");
+        string data2("hijklmn");
+
+        Message::shared_ptr message(new Message(0, "test", "my_routing_key", 
false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(14);
+        BasicHeaderProperties* properties = 
dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+        properties->setMessageId("MyMessage");
+        properties->getHeaders().setString("abc", "xyz");
+
+        AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));        
+        
+        builder.initialise(message);
+        builder.setHeader(header);
+        builder.addContent(part1);
+        builder.addContent(part2);
+        CPPUNIT_ASSERT(handler.msg);
+        CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+
+        Message::shared_ptr restored = store.getRestoredMessage();
+        CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
+        CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(), 
restored->getRoutingKey());
+        CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(), 
restored->getHeaderProperties()->getMessageId());
+        
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),
 
+                             
restored->getHeaderProperties()->getHeaders().getString("abc"));
+        CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize());
     }
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxAckTest.cpp Fri Nov 
24 09:21:47 2006
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-#include <qpid/broker/MessageStore.h>
+#include <qpid/broker/NullMessageStore.h>
 #include <qpid/broker/RecoveryManager.h>
 #include <qpid/broker/TxAck.h>
 #include <qpid_test_plugin.h>
@@ -34,7 +34,7 @@
 class TxAckTest : public CppUnit::TestCase  
 {
 
-    class TestMessageStore : public MessageStore
+    class TestMessageStore : public NullMessageStore
     {
     public:
         vector<Message::shared_ptr> dequeued;
@@ -44,16 +44,7 @@
             dequeued.push_back(msg);
         }
 
-        //dont care about any of the other methods:
-        void create(const Queue&){}
-        void destroy(const Queue&){}        
-        void recover(RecoveryManager&){}
-        void enqueue(TransactionContext*, Message::shared_ptr&, const Queue&, 
const string * const){}
-        void committed(const string * const){}
-        void aborted(const string * const){}
-        std::auto_ptr<TransactionContext> begin(){ return 
std::auto_ptr<TransactionContext>(); }
-        void commit(TransactionContext*){}
-        void abort(TransactionContext*){}        
+        TestMessageStore() : NullMessageStore(false) {}
         ~TestMessageStore(){}
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp?view=diff&rev=478923&r1=478922&r2=478923
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/TxPublishTest.cpp Fri 
Nov 24 09:21:47 2006
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-#include <qpid/broker/MessageStore.h>
+#include <qpid/broker/NullMessageStore.h>
 #include <qpid/broker/RecoveryManager.h>
 #include <qpid/broker/TxPublish.h>
 #include <qpid_test_plugin.h>
@@ -35,7 +35,7 @@
 class TxPublishTest : public CppUnit::TestCase  
 {
 
-    class TestMessageStore : public MessageStore
+    class TestMessageStore : public NullMessageStore
     {
     public:
         vector< pair<string, Message::shared_ptr> > enqueued;
@@ -46,15 +46,7 @@
         }
         
         //dont care about any of the other methods:
-        void create(const Queue&){}
-        void destroy(const Queue&){}
-        void recover(RecoveryManager&){}
-        void dequeue(TransactionContext*, Message::shared_ptr&, const Queue&, 
const string * const){}
-        void committed(const string * const){}
-        void aborted(const string * const){}
-        std::auto_ptr<TransactionContext> begin(){ return 
std::auto_ptr<TransactionContext>(); }
-        void commit(TransactionContext*){}
-        void abort(TransactionContext*){}        
+        TestMessageStore() : NullMessageStore(false) {}
         ~TestMessageStore(){}
     };
     


Reply via email to