Author: gsim
Date: Thu Nov 9 02:40:56 2006
New Revision: 472850
URL: http://svn.apache.org/viewvc?view=rev&rev=472850
Log:
Added some encode/decode routines to Message (plus test).
Altered Buffer to allow memory for data to be specified on construction.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Nov 9
02:40:56 2006
@@ -33,6 +33,24 @@
size(0),
persistenceId(0) {}
+Message::Message(Buffer& buffer) : publisher(0), mandatory(false),
immediate(false), redelivered(false), size(0), persistenceId(0){
+ buffer.getShortString(exchange);
+ buffer.getShortString(routingKey);
+
+ AMQFrame headerFrame;
+ headerFrame.decode(buffer);
+ AMQHeaderBody::shared_ptr headerBody = dynamic_pointer_cast<AMQHeaderBody,
AMQBody>(headerFrame.getBody());
+ setHeader(headerBody);
+
+ AMQContentBody::shared_ptr contentBody;
+ while (buffer.available()) {
+ AMQFrame contentFrame;
+ contentFrame.decode(buffer);
+ contentBody = dynamic_pointer_cast<AMQContentBody,
AMQBody>(contentFrame.getBody());
+ addContent(contentBody);
+ }
+}
+
Message::~Message(){}
void Message::setHeader(AMQHeaderBody::shared_ptr _header){
@@ -96,4 +114,36 @@
if(!header) return false;
BasicHeaderProperties* props = getHeaderProperties();
return props && props->getDeliveryMode() == PERSISTENT;
+}
+
+void Message::encode(Buffer& buffer)
+{
+ buffer.putShortString(exchange);
+ buffer.putShortString(routingKey);
+
+ AMQBody::shared_ptr body;
+
+ body = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
+
+ AMQFrame headerFrame(0, body);
+ headerFrame.encode(buffer);
+
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
+ AMQFrame contentFrame(0, body);
+ contentFrame.encode(buffer);
+ }
+}
+
+u_int32_t Message::encodedSize()
+{
+ int encodedContentSize(0);
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
(TODO, could replace frame by simple size)
+ }
+
+ return exchange.size() + 1
+ + routingKey.size() + 1
+ + header->size() + 8 //8 extra bytes for frame (TODO, could actually
remove the frame)
+ + encodedContentSize;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Nov 9 02:40:56
2006
@@ -40,8 +40,8 @@
typedef content_list::iterator content_iterator;
const ConnectionToken* const publisher;
- const string exchange;
- const string routingKey;
+ string exchange;
+ string routingKey;
const bool mandatory;
const bool immediate;
bool redelivered;
@@ -59,6 +59,7 @@
Message(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
+ Message(qpid::framing::Buffer& buffer);
~Message();
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
void addContent(qpid::framing::AMQContentBody::shared_ptr data);
@@ -84,7 +85,14 @@
u_int64_t contentSize() const { return size; }
u_int64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(u_int64_t _persistenceId) { persistenceId =
_persistenceId; }
+ void encode(qpid::framing::Buffer& buffer);
+ /**
+ * @returns the size of the buffer needed to encode this message
+ */
+ u_int32_t encodedSize();
+
};
+
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Nov 9 02:40:56
2006
@@ -57,6 +57,11 @@
process(msg);
}
+void Queue::recover(Message::shared_ptr& msg){
+ queueing = true;
+ messages.push(msg);
+}
+
void Queue::process(Message::shared_ptr& msg){
Mutex::ScopedLock locker(lock);
if(queueing || !dispatch(msg)){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Nov 9 02:40:56
2006
@@ -57,7 +57,7 @@
mutable qpid::sys::Mutex lock;
int64_t lastUsed;
Consumer* exclusive;
- u_int64_t persistenceId;
+ mutable u_int64_t persistenceId;
bool startDispatching();
bool dispatch(Message::shared_ptr& msg);
@@ -91,6 +91,10 @@
*/
void process(Message::shared_ptr& msg);
/**
+ * Used during recovery to add stored messages back to the queue
+ */
+ void recover(Message::shared_ptr& msg);
+ /**
* Dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
* at any time, but this method (rather than the caller)
@@ -107,7 +111,7 @@
inline const bool isExclusiveOwner(const ConnectionToken* const o)
const { return o == owner; }
inline bool hasExclusiveConsumer() const { return exclusive; }
inline u_int64_t getPersistenceId() const { return persistenceId; }
- inline void setPersistenceId(u_int64_t _persistenceId) {
persistenceId = _persistenceId; }
+ inline void setPersistenceId(u_int64_t _persistenceId) const {
persistenceId = _persistenceId; }
bool canAutoDelete() const;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.cpp Thu Nov 9
02:40:56 2006
@@ -18,12 +18,15 @@
#include "qpid/framing/Buffer.h"
#include "qpid/framing/FieldTable.h"
-qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), position(0),
limit(_size){
+qpid::framing::Buffer::Buffer(u_int32_t _size) : size(_size), owner(true),
position(0), limit(_size){
data = new char[size];
}
+qpid::framing::Buffer::Buffer(char* _data, u_int32_t _size) : size(_size),
owner(false), data(_data), position(0), limit(_size){
+}
+
qpid::framing::Buffer::~Buffer(){
- delete[] data;
+ if(owner) delete[] data;
}
void qpid::framing::Buffer::flip(){
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/Buffer.h Thu Nov 9 02:40:56
2006
@@ -28,6 +28,7 @@
class Buffer
{
const u_int32_t size;
+ const bool owner;//indicates whether the data is owned by this instance
char* data;
u_int32_t position;
u_int32_t limit;
@@ -37,6 +38,7 @@
public:
Buffer(u_int32_t size);
+ Buffer(char* data, u_int32_t size);
~Buffer();
void flip();
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp?view=diff&rev=472850&r1=472849&r2=472850
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp Thu Nov
9 02:40:56 2006
@@ -19,26 +19,68 @@
#include <qpid_test_plugin.h>
#include <iostream>
+using namespace boost;
using namespace qpid::broker;
using namespace qpid::framing;
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
+ }
+};
+
class MessageTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(MessageTest);
- CPPUNIT_TEST(testMe);
+ CPPUNIT_TEST(testEncodeDecode);
CPPUNIT_TEST_SUITE_END();
public:
- void testMe()
+ void testEncodeDecode()
{
- const int size(10);
- for(int i = 0; i < size; i++){
- Message::shared_ptr msg = Message::shared_ptr(new Message(0, "A",
"B", true, true));
- msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody()));
- msg->addContent(AMQContentBody::shared_ptr(new AMQContentBody()));
- msg.reset();
- }
+ string exchange = "MyExchange";
+ string routingKey = "MyRoutingKey";
+ string messageId = "MyMessage";
+ string data1("abcdefg");
+ string data2("hijklmn");
+
+ Message::shared_ptr msg = Message::shared_ptr(new Message(0, exchange,
routingKey, false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+ AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+ msg->setHeader(header);
+ msg->addContent(part1);
+ msg->addContent(part2);
+
+ msg->getHeaderProperties()->setMessageId(messageId);
+ msg->getHeaderProperties()->setDeliveryMode(PERSISTENT);
+ msg->getHeaderProperties()->getHeaders().setString("abc", "xyz");
+
+ Buffer buffer(msg->encodedSize());
+ msg->encode(buffer);
+ buffer.flip();
+
+ msg = Message::shared_ptr(new Message(buffer));
+ CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchange());
+ CPPUNIT_ASSERT_EQUAL(routingKey, msg->getRoutingKey());
+ CPPUNIT_ASSERT_EQUAL(messageId,
msg->getHeaderProperties()->getMessageId());
+ CPPUNIT_ASSERT_EQUAL((u_int8_t) PERSISTENT,
msg->getHeaderProperties()->getDeliveryMode());
+ CPPUNIT_ASSERT_EQUAL(string("xyz"),
msg->getHeaderProperties()->getHeaders().getString("abc"));
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, msg->contentSize());
+
+ DummyHandler handler;
+ msg->deliver(&handler, 0, "ignore", 0, 100);
+ CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
+ AMQContentBody::shared_ptr
contentBody1(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[2]->getBody()));
+ AMQContentBody::shared_ptr
contentBody2(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[3]->getBody()));
+ CPPUNIT_ASSERT(contentBody1);
+ CPPUNIT_ASSERT(contentBody2);
+ CPPUNIT_ASSERT_EQUAL(data1, contentBody1->getData());
+ CPPUNIT_ASSERT_EQUAL(data2, contentBody2->getData());
}
};