Author: gsim
Date: Thu Nov  9 02:40:56 2006
New Revision: 472850

URL: http://svn.apache.org/viewvc?view=rev&rev=472850
Log:
Added some encode/decode routines to Message (plus test).
Altered Buffer to allow memory for data to be specified on construction. 


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
    incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov  9 
02:40:56 2006
@@ -33,6 +33,24 @@
                                                      size(0),
                                                      persistenceId(0) {}
 
+Message::Message(Buffer& buffer) : publisher(0), mandatory(false), 
immediate(false), redelivered(false), size(0), persistenceId(0){
+    buffer.getShortString(exchange);
+    buffer.getShortString(routingKey);
+    
+    AMQFrame headerFrame;
+    headerFrame.decode(buffer);
+    AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody, 
AMQBody>(headerFrame.getBody());
+    setHeader(headerBody);
+    
+    AMQContentBody::shared_ptr contentBody;
+    while (buffer.available()) {
+        AMQFrame contentFrame;
+        contentFrame.decode(buffer);
+        contentBody = dynamic_pointer_cast<AMQContentBody, 
AMQBody>(contentFrame.getBody());
+        addContent(contentBody);
+    }        
+}
+
 Message::~Message(){}
 
 void Message::setHeader(AMQHeaderBody::shared_ptr _header){
@@ -96,4 +114,36 @@
     if(!header) return false;
     BasicHeaderProperties* props = getHeaderProperties();
     return props && props->getDeliveryMode() == PERSISTENT;
+}
+
+void Message::encode(Buffer& buffer)
+{
+    buffer.putShortString(exchange);
+    buffer.putShortString(routingKey);
+    
+    AMQBody::shared_ptr body;
+
+    body = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+
+    AMQFrame headerFrame(0, body);
+    headerFrame.encode(buffer);
+    
+    for (content_iterator i = content.begin(); i != content.end(); i++) {
+        body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+        AMQFrame contentFrame(0, body);
+        contentFrame.encode(buffer);
+    }    
+}
+
+u_int32_t Message::encodedSize()
+{
+    int encodedContentSize(0);
+    for (content_iterator i = content.begin(); i != content.end(); i++) {
+        encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame 
(TODO, could replace frame by simple size)
+    }
+
+    return exchange.size() + 1
+        + routingKey.size() + 1
+        + header->size() + 8 //8 extra bytes for frame (TODO, could actually 
remove the frame)
+        + encodedContentSize;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Nov  9 02:40:56 
2006
@@ -40,8 +40,8 @@
             typedef content_list::iterator content_iterator;
 
             const ConnectionToken* const publisher;
-            const string exchange;
-            const string routingKey;
+            string exchange;
+            string routingKey;
             const bool mandatory;
             const bool immediate;
             bool redelivered;
@@ -59,6 +59,7 @@
             Message(const ConnectionToken* const publisher, 
                     const string& exchange, const string& routingKey, 
                     bool mandatory, bool immediate);
+            Message(qpid::framing::Buffer& buffer);
             ~Message();
             void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
             void addContent(qpid::framing::AMQContentBody::shared_ptr data);
@@ -84,7 +85,14 @@
             u_int64_t contentSize() const { return size; }
             u_int64_t getPersistenceId() const { return persistenceId; }
             void setPersistenceId(u_int64_t _persistenceId) { persistenceId = 
_persistenceId; }
+            void encode(qpid::framing::Buffer& buffer);
+            /**
+             * @returns the size of the buffer needed to encode this message
+             */
+            u_int32_t encodedSize();
+            
         };
+
     }
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Nov  9 02:40:56 
2006
@@ -57,6 +57,11 @@
     process(msg);
 }
 
+void Queue::recover(Message::shared_ptr& msg){
+    queueing = true;
+    messages.push(msg);
+}
+
 void Queue::process(Message::shared_ptr& msg){
     Mutex::ScopedLock locker(lock);
     if(queueing || !dispatch(msg)){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Nov  9 02:40:56 
2006
@@ -57,7 +57,7 @@
             mutable qpid::sys::Mutex lock;
             int64_t lastUsed;
             Consumer* exclusive;
-            u_int64_t persistenceId;
+            mutable u_int64_t persistenceId;
 
             bool startDispatching();
             bool dispatch(Message::shared_ptr& msg);
@@ -91,6 +91,10 @@
              */
             void process(Message::shared_ptr& msg);
             /**
+             * Used during recovery to add stored messages back to the queue
+             */
+            void recover(Message::shared_ptr& msg);
+            /**
              * Dispatch any queued messages providing there are
              * consumers for them. Only one thread can be dispatching
              * at any time, but this method (rather than the caller)
@@ -107,7 +111,7 @@
             inline const bool isExclusiveOwner(const ConnectionToken* const o) 
const { return o == owner; }
             inline bool hasExclusiveConsumer() const { return exclusive; }
             inline u_int64_t getPersistenceId() const { return persistenceId; }
-            inline void setPersistenceId(u_int64_t _persistenceId) { 
persistenceId = _persistenceId; }
+            inline void setPersistenceId(u_int64_t _persistenceId) const { 
persistenceId = _persistenceId; }
 
             bool canAutoDelete() const;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Thu Nov  9 
02:40:56 2006
@@ -18,12 +18,15 @@
 #include "qpid/framing/Buffer.h"
 #include "qpid/framing/FieldTable.h" 
 
-qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), position(0), 
limit(_size){
+qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), owner(true), 
position(0), limit(_size){
     data = new char[size];
 }
 
+qpid::framing::Buffer::Buffer(char* _data, u_int32_t _size) : size(_size), 
owner(false), data(_data), position(0), limit(_size){
+}
+
 qpid::framing::Buffer::~Buffer(){
-    delete[] data;
+    if(owner) delete[] data;
 }
 
 void qpid::framing::Buffer::flip(){

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Thu Nov  9 02:40:56 
2006
@@ -28,6 +28,7 @@
 class Buffer
 {
     const u_int32_t size;
+    const bool owner;//indicates whether the data is owned by this instance
     char* data;
     u_int32_t position;
     u_int32_t limit;
@@ -37,6 +38,7 @@
 public:
 
     Buffer(u_int32_t size);
+    Buffer(char* data, u_int32_t size);
     ~Buffer();
 
     void flip();

Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp Thu Nov 
 9 02:40:56 2006
@@ -19,26 +19,68 @@
 #include <qpid_test_plugin.h>
 #include <iostream>
 
+using namespace boost;
 using namespace qpid::broker;
 using namespace qpid::framing;
 
+struct DummyHandler : OutputHandler{
+    std::vector<AMQFrame*> frames; 
+
+    virtual void send(AMQFrame* frame){
+        frames.push_back(frame);
+    }
+};
+
 class MessageTest : public CppUnit::TestCase  
 {
     CPPUNIT_TEST_SUITE(MessageTest);
-    CPPUNIT_TEST(testMe);
+    CPPUNIT_TEST(testEncodeDecode);
     CPPUNIT_TEST_SUITE_END();
 
   public:
 
-    void testMe() 
+    void testEncodeDecode()
     {
-        const int size(10);
-        for(int i = 0; i < size; i++){
-            Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A", 
"B", true, true));
-            msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody()));
-            msg->addContent(AMQContentBody::shared_ptr(new AMQContentBody()));
-            msg.reset();
-        }
+        string exchange = "MyExchange";
+        string routingKey = "MyRoutingKey";
+        string messageId = "MyMessage";
+        string data1("abcdefg");
+        string data2("hijklmn");
+
+        Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange, 
routingKey, false, false));
+        AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+        header->setContentSize(14);        
+        AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+        AMQContentBody::shared_ptr part2(new AMQContentBody(data2));        
+        msg->setHeader(header);
+        msg->addContent(part1);
+        msg->addContent(part2);
+
+        msg->getHeaderProperties()->setMessageId(messageId);
+        msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+        msg->getHeaderProperties()->getHeaders().setString("abc", "xyz");
+
+        Buffer buffer(msg->encodedSize());
+        msg->encode(buffer);
+        buffer.flip();
+        
+        msg = Message::shared_ptr(new Message(buffer));
+        CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
+        CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
+        CPPUNIT_ASSERT_EQUAL(messageId, 
msg->getHeaderProperties()->getMessageId());
+        CPPUNIT_ASSERT_EQUAL((u_int8_t) PERSISTENT, 
msg->getHeaderProperties()->getDeliveryMode());
+        CPPUNIT_ASSERT_EQUAL(string("xyz"), 
msg->getHeaderProperties()->getHeaders().getString("abc"));
+        CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
+
+        DummyHandler handler;
+        msg->deliver(&handler, 0, "ignore", 0, 100); 
+        CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+        AMQContentBody::shared_ptr 
contentBody1(dynamic_pointer_cast<AMQContentBody, 
AMQBody>(handler.frames[2]->getBody()));
+        AMQContentBody::shared_ptr 
contentBody2(dynamic_pointer_cast<AMQContentBody, 
AMQBody>(handler.frames[3]->getBody()));
+        CPPUNIT_ASSERT(contentBody1);
+        CPPUNIT_ASSERT(contentBody2);
+        CPPUNIT_ASSERT_EQUAL(data1, contentBody1->getData());
+        CPPUNIT_ASSERT_EQUAL(data2, contentBody2->getData());
     }
 };
 


Reply via email to