Author: gsim
Date: Tue Nov 28 07:25:35 2006
New Revision: 480087
URL: http://svn.apache.org/viewvc?view=rev&rev=480087
Log:
Modifications to allow loading of message data in chunks, refragmentation of
messages, plus some related refactoring and tests.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp (with
props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h (with
props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp (with
props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h (with
props)
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
(with props)
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
(with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h Tue Nov 28 07:25:35
2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Content_
+#define _Content_
+
+#include <qpid/framing/AMQContentBody.h>
+#include <qpid/framing/Buffer.h>
+#include <qpid/framing/OutputHandler.h>
+
+namespace qpid {
+ namespace broker {
+ class Content{
+ public:
+ virtual void add(qpid::framing::AMQContentBody::shared_ptr data) =
0;
+ virtual u_int32_t size() = 0;
+ virtual void send(qpid::framing::OutputHandler* out, int channel,
u_int32_t framesize) = 0;
+ virtual void encode(qpid::framing::Buffer& buffer) = 0;
+ virtual ~Content(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Content.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp Tue Nov
28 07:25:35 2006
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/InMemoryContent.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+using boost::static_pointer_cast;
+
+void InMemoryContent::add(AMQContentBody::shared_ptr data)
+{
+ content.push_back(data);
+}
+
+u_int32_t InMemoryContent::size()
+{
+ int sum(0);
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ sum += (*i)->size() + 8;//8 extra bytes for the frame
+ //TODO: have to get rid of the frame stuff from encoded data
+ }
+ return sum;
+}
+
+void InMemoryContent::send(OutputHandler* out, int channel, u_int32_t
framesize)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ if ((*i)->size() > framesize) {
+ u_int32_t offset = 0;
+ for (int chunk = (*i)->size() / framesize; chunk > 0; chunk--) {
+ string data = (*i)->getData().substr(offset, framesize);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ offset += framesize;
+ }
+ u_int32_t remainder = (*i)->size() % framesize;
+ if (remainder) {
+ string data = (*i)->getData().substr(offset, remainder);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ }
+ } else {
+ AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody,
AMQContentBody>(*i);
+ out->send(new AMQFrame(channel, contentBody));
+ }
+ }
+}
+
+void InMemoryContent::encode(Buffer& buffer)
+{
+ for (content_iterator i = content.begin(); i != content.end(); i++) {
+ (*i)->encode(buffer);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h Tue Nov 28
07:25:35 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _InMemoryContent_
+#define _InMemoryContent_
+
+#include <qpid/broker/Content.h>
+#include <vector>
+
+namespace qpid {
+ namespace broker {
+ class InMemoryContent : public Content{
+ typedef std::vector<qpid::framing::AMQContentBody::shared_ptr>
content_list;
+ typedef content_list::iterator content_iterator;
+
+ content_list content;
+ public:
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ u_int32_t size();
+ void send(qpid::framing::OutputHandler* out, int channel,
u_int32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ ~InMemoryContent(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/InMemoryContent.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp Tue Nov
28 07:25:35 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/LazyLoadedContent.h"
+
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, u_int64_t
_msgId, u_int64_t _expectedSize) :
+ store(_store), msgId(_msgId), expectedSize(_expectedSize) {}
+
+void LazyLoadedContent::add(AMQContentBody::shared_ptr data)
+{
+ store->appendContent(msgId, data->getData());
+}
+
+u_int32_t LazyLoadedContent::size()
+{
+ return 0;//all content is written as soon as it is added
+}
+
+void LazyLoadedContent::send(OutputHandler* out, int channel, u_int32_t
framesize)
+{
+ if (expectedSize > framesize) {
+ for (u_int64_t offset = 0; offset < expectedSize; offset += framesize)
{
+ u_int64_t remaining = expectedSize - offset;
+ string data;
+ store->loadContent(msgId, data, offset, remaining > framesize ?
framesize : remaining);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ }
+ } else {
+ string data;
+ store->loadContent(msgId, data, 0, expectedSize);
+ out->send(new AMQFrame(channel, new AMQContentBody(data)));
+ }
+}
+
+void LazyLoadedContent::encode(Buffer&)
+{
+ //do nothing as all content is written as soon as it is added
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h Tue Nov
28 07:25:35 2006
@@ -0,0 +1,45 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _LazyLoadedContent_
+#define _LazyLoadedContent_
+
+#include <qpid/broker/Content.h>
+#include <qpid/broker/MessageStore.h>
+
+namespace qpid {
+ namespace broker {
+ class LazyLoadedContent : public Content{
+ MessageStore* const store;
+ const u_int64_t msgId;
+ const u_int64_t expectedSize;
+ public:
+ LazyLoadedContent(MessageStore* const store, u_int64_t msgId,
u_int64_t expectedSize);
+ void add(qpid::framing::AMQContentBody::shared_ptr data);
+ u_int32_t size();
+ void send(qpid::framing::OutputHandler* out, int channel,
u_int32_t framesize);
+ void encode(qpid::framing::Buffer& buffer);
+ ~LazyLoadedContent(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/LazyLoadedContent.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Tue Nov 28
07:25:35 2006
@@ -20,6 +20,10 @@
*/
#include <qpid/broker/Message.h>
#include <iostream>
+
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid/broker/LazyLoadedContent.h>
+#include <qpid/broker/MessageStore.h>
// AMQP version change - kpvdr 2006-11-17
#include <qpid/framing/ProtocolVersion.h>
#include <qpid/framing/BasicDeliverBody.h>
@@ -40,8 +44,10 @@
size(0),
persistenceId(0) {}
-Message::Message(Buffer& buffer) : publisher(0), mandatory(false),
immediate(false), redelivered(false), size(0), persistenceId(0){
- decode(buffer);
+Message::Message(Buffer& buffer, bool headersOnly, u_int32_t contentChunkSize)
:
+ publisher(0), mandatory(false), immediate(false), redelivered(false),
size(0), persistenceId(0){
+
+ decode(buffer, headersOnly, contentChunkSize);
}
Message::Message() : publisher(0), mandatory(false), immediate(false),
redelivered(false), size(0), persistenceId(0){}
@@ -53,7 +59,10 @@
}
void Message::addContent(AMQContentBody::shared_ptr data){
- content.push_back(data);
+ if (!content.get()) {
+ content = std::auto_ptr<Content>(new InMemoryContent());
+ }
+ content->add(data);
size += data->size();
}
@@ -68,8 +77,9 @@
void Message::deliver(OutputHandler* out, int channel,
const string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired
numbers to that version
+
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers
to that version
out->send(new AMQFrame(channel, new BasicDeliverBody(ProtocolVersion(8,0),
consumerTag, deliveryTag, redelivered, exchange, routingKey)));
sendContent(out, channel, framesize);
}
@@ -80,8 +90,8 @@
u_int64_t deliveryTag,
u_int32_t framesize){
- // AMQP version change - kpvdr 2006-11-17
- // TODO: Make this class version-aware and link these hard-wired
numbers to that version
+ // AMQP version change - kpvdr 2006-11-17
+ // TODO: Make this class version-aware and link these hard-wired numbers
to that version
out->send(new AMQFrame(channel, new BasicGetOkBody(ProtocolVersion(8,0),
deliveryTag, redelivered, exchange, routingKey, messageCount)));
sendContent(out, channel, framesize);
}
@@ -89,15 +99,8 @@
void Message::sendContent(OutputHandler* out, int channel, u_int32_t
framesize){
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody,
AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
- for(content_iterator i = content.begin(); i != content.end(); i++){
- if((*i)->size() > framesize){
- //TODO: need to split it
- std::cout << "WARNING: Dropped message. Re-fragmentation not yet
implemented." << std::endl;
- }else{
- AMQBody::shared_ptr contentBody = static_pointer_cast<AMQBody,
AMQContentBody>(*i);
- out->send(new AMQFrame(channel, contentBody));
- }
- }
+
+ if (content.get()) content->send(out, channel, framesize);
}
BasicHeaderProperties* Message::getHeaderProperties(){
@@ -115,10 +118,10 @@
return props && props->getDeliveryMode() == PERSISTENT;
}
-void Message::decode(Buffer& buffer)
+void Message::decode(Buffer& buffer, bool headersOnly, u_int32_t
contentChunkSize)
{
decodeHeader(buffer);
- decodeContent(buffer);
+ if (!headersOnly) decodeContent(buffer, contentChunkSize);
}
void Message::decodeHeader(Buffer& buffer)
@@ -132,15 +135,25 @@
setHeader(headerBody);
}
-void Message::decodeContent(Buffer& buffer)
+void Message::decodeContent(Buffer& buffer, u_int32_t chunkSize)
{
- AMQContentBody::shared_ptr contentBody;
- while (buffer.available()) {
- AMQFrame contentFrame;
- contentFrame.decode(buffer);
- contentBody = dynamic_pointer_cast<AMQContentBody,
AMQBody>(contentFrame.getBody());
+ u_int64_t expected = expectedContentSize();
+ if (expected != buffer.available()) {
+ std::cout << "WARN: Expected " << expectedContentSize() << " bytes,
got " << buffer.available() << std::endl;
+ }
+
+ if (!chunkSize || chunkSize > expected) {
+ chunkSize = expected;
+ }
+
+ u_int64_t total = 0;
+ while (total < expectedContentSize()) {
+ u_int64_t remaining = expected - total;
+ AMQContentBody::shared_ptr contentBody(new AMQContentBody());
+ contentBody->decode(buffer, remaining < chunkSize ? remaining :
chunkSize);
addContent(contentBody);
- }
+ total += chunkSize;
+ }
}
void Message::encode(Buffer& buffer)
@@ -159,15 +172,7 @@
void Message::encodeContent(Buffer& buffer)
{
- //Use a frame around each content block. Not really required but
- //gives some error checking at little expense. Could change in the
- //future...
- AMQBody::shared_ptr body;
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- body = static_pointer_cast<AMQBody, AMQContentBody>(*i);
- AMQFrame contentFrame(0, body);
- contentFrame.encode(buffer);
- }
+ if (content.get()) content->encode(buffer);
}
u_int32_t Message::encodedSize()
@@ -177,11 +182,7 @@
u_int32_t Message::encodedContentSize()
{
- int encodedContentSize(0);
- for (content_iterator i = content.begin(); i != content.end(); i++) {
- encodedContentSize += (*i)->size() + 8;//8 extra bytes for the frame
- }
- return encodedContentSize;
+ return content.get() ? content->size() : 0;
}
u_int32_t Message::encodedHeaderSize()
@@ -196,7 +197,15 @@
return header.get() ? header->getContentSize() : 0;
}
-void Message::releaseContent()
+void Message::releaseContent(MessageStore* store)
+{
+ if (!content.get() || content->size() > 0) {
+ //set content to lazy loading mode (but only if there is stored
content):
+ content = std::auto_ptr<Content>(new LazyLoadedContent(store,
getPersistenceId(), expectedContentSize()));
+ }
+}
+
+void Message::setContent(std::auto_ptr<Content>& _content)
{
- content.clear();
+ content = _content;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Tue Nov 28 07:25:35
2006
@@ -21,8 +21,10 @@
#ifndef _Message_
#define _Message_
+#include <memory>
#include <boost/shared_ptr.hpp>
#include <qpid/broker/ConnectionToken.h>
+#include <qpid/broker/Content.h>
#include <qpid/broker/TxBuffer.h>
#include <qpid/framing/AMQContentBody.h>
#include <qpid/framing/AMQHeaderBody.h>
@@ -32,6 +34,7 @@
namespace qpid {
namespace broker {
+ class MessageStore;
using qpid::framing::string;
/**
* Represents an AMQP message, i.e. a header body, a list of
@@ -39,9 +42,6 @@
* request.
*/
class Message{
- typedef std::vector<qpid::framing::AMQContentBody::shared_ptr>
content_list;
- typedef content_list::iterator content_iterator;
-
const ConnectionToken* const publisher;
string exchange;
string routingKey;
@@ -49,7 +49,7 @@
const bool immediate;
bool redelivered;
qpid::framing::AMQHeaderBody::shared_ptr header;
- content_list content;
+ std::auto_ptr<Content> content;
u_int64_t size;
u_int64_t persistenceId;
@@ -62,7 +62,7 @@
Message(const ConnectionToken* const publisher,
const string& exchange, const string& routingKey,
bool mandatory, bool immediate);
- Message(qpid::framing::Buffer& buffer);
+ Message(qpid::framing::Buffer& buffer, bool headersOnly = false,
u_int32_t contentChunkSize = 0);
Message();
~Message();
void setHeader(qpid::framing::AMQHeaderBody::shared_ptr header);
@@ -90,9 +90,9 @@
u_int64_t getPersistenceId() const { return persistenceId; }
void setPersistenceId(u_int64_t _persistenceId) { persistenceId =
_persistenceId; }
- void decode(qpid::framing::Buffer& buffer);
+ void decode(qpid::framing::Buffer& buffer, bool headersOnly =
false, u_int32_t contentChunkSize = 0);
void decodeHeader(qpid::framing::Buffer& buffer);
- void decodeContent(qpid::framing::Buffer& buffer);
+ void decodeContent(qpid::framing::Buffer& buffer, u_int32_t
contentChunkSize = 0);
void encode(qpid::framing::Buffer& buffer);
void encodeHeader(qpid::framing::Buffer& buffer);
@@ -114,14 +114,22 @@
*/
u_int32_t encodedContentSize();
/**
- * Releases the in-memory content data held by this message.
+ * Releases the in-memory content data held by this
+ * message. Must pass in a store from which the data can
+ * be reloaded.
*/
- void releaseContent();
+ void releaseContent(MessageStore* store);
/**
* If headers have been received, returns the expected
* content size else returns 0.
*/
u_int64_t expectedContentSize();
+ /**
+ * Sets the 'content' implementation of this message (the
+ * message controls the lifecycle of the content instance
+ * it uses).
+ */
+ void setContent(std::auto_ptr<Content>& content);
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Tue Nov 28
07:25:35 2006
@@ -20,25 +20,23 @@
*/
#include <qpid/broker/MessageBuilder.h>
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid/broker/LazyLoadedContent.h>
+
using namespace qpid::broker;
using namespace qpid::framing;
+using std::auto_ptr;
MessageBuilder::MessageBuilder(CompletionHandler* _handler, MessageStore*
const _store, u_int64_t _stagingThreshold) :
handler(_handler),
store(_store),
- stagingThreshold(_stagingThreshold),
- staging(false)
+ stagingThreshold(_stagingThreshold)
{}
void MessageBuilder::route(){
- if (staging && store) {
- store->stage(message);
- message->releaseContent();
- }
if (message->isComplete()) {
if (handler) handler->complete(message);
message.reset();
- staging = false;
}
}
@@ -54,7 +52,14 @@
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, "Invalid message sequence: got
header before publish.");
}
message->setHeader(header);
- staging = stagingThreshold && header->getContentSize() >= stagingThreshold;
+ if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
+ store->stage(message);
+ auto_ptr<Content> content(new LazyLoadedContent(store,
message->getPersistenceId(), message->expectedContentSize()));
+ message->setContent(content);
+ } else {
+ auto_ptr<Content> content(new InMemoryContent());
+ message->setContent(content);
+ }
route();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.h Tue Nov 28
07:25:35 2006
@@ -21,6 +21,7 @@
#ifndef _MessageBuilder_
#define _MessageBuilder_
+#include <memory>
#include <qpid/QpidError.h>
#include <qpid/broker/Exchange.h>
#include <qpid/broker/Message.h>
@@ -47,7 +48,6 @@
CompletionHandler* handler;
MessageStore* const store;
const u_int64_t stagingThreshold;
- bool staging;
void route();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Tue Nov 28
07:25:35 2006
@@ -51,9 +51,9 @@
* (enqueueing automatically stores the message so this is
* only required if storage is required prior to that
* point). If the message has not yet been stored it will
- * store the headers and any available content. If the
- * message has already been stored it will append any
- * currently held content.
+ * store the headers as well as any content passed in. A
+ * persistence id will be set on the message which can be
+ * used to load the content or to append to it.
*/
virtual void stage(Message::shared_ptr& msg) = 0;
@@ -64,6 +64,21 @@
* is dequeued from all queues it was enqueued onto).
*/
virtual void destroy(Message::shared_ptr& msg) = 0;
+
+ /**
+ * Appends content to a previously staged message
+ */
+ virtual void appendContent(u_int64_t msgId, const std::string&
data) = 0;
+
+ /**
+ * Loads (a section) of content data for the specified
+ * message id (previously set on the message through a
+ * call to stage or enqueue) into data. The offset refers
+ * to the content only (i.e. an offset of 0 implies that
+ * the start of the content should be loaded, not the
+ * headers or related meta-data).
+ */
+ virtual void loadContent(u_int64_t msgId, std::string& data,
u_int64_t offset, u_int32_t length) = 0;
/**
* Enqueues a message, storing the message if it has not
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Tue
Nov 28 07:25:35 2006
@@ -53,6 +53,16 @@
store->destroy(msg);
}
+void MessageStoreModule::appendContent(u_int64_t msgId, const std::string&
data)
+{
+ store->appendContent(msgId, data);
+}
+
+void MessageStoreModule::loadContent(u_int64_t msgId, string& data, u_int64_t
offset, u_int32_t length)
+{
+ store->loadContent(msgId, data, offset, length);
+}
+
void MessageStoreModule::enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid)
{
store->enqueue(ctxt, msg, queue, xid);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Tue Nov
28 07:25:35 2006
@@ -41,6 +41,8 @@
void recover(RecoveryManager& queues);
void stage(Message::shared_ptr& msg);
void destroy(Message::shared_ptr& msg);
+ void appendContent(u_int64_t msgId, const std::string& data);
+ void loadContent(u_int64_t msgId, std::string& data, u_int64_t
offset, u_int32_t length);
void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
void committed(const string * const xid);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Tue Nov
28 07:25:35 2006
@@ -34,45 +34,66 @@
{
if (warn) std::cout << "WARNING: Can't create durable queue '" <<
queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::destroy(const Queue& queue)
{
if (warn) std::cout << "WARNING: Can't destroy durable queue '" <<
queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::recover(RecoveryManager&)
{
if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of
queues or messages." << std::endl;
}
+
void NullMessageStore::stage(Message::shared_ptr&)
{
if (warn) std::cout << "WARNING: Can't stage message. Persistence not
enabled." << std::endl;
}
+
void NullMessageStore::destroy(Message::shared_ptr&)
{
if (warn) std::cout << "WARNING: No need to destroy staged message.
Persistence not enabled." << std::endl;
}
+
+void NullMessageStore::appendContent(u_int64_t, const string&)
+{
+ if (warn) std::cout << "WARNING: Can't append content. Persistence not
enabled." << std::endl;
+}
+
+void NullMessageStore::loadContent(u_int64_t, string&, u_int64_t, u_int32_t)
+{
+ if (warn) std::cout << "WARNING: Can't load content. Persistence not
enabled." << std::endl;
+}
+
void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&,
const Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't enqueue message onto '" <<
queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&,
const Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't dequeue message from '" <<
queue.getName() << "'. Persistence not enabled." << std::endl;
}
+
void NullMessageStore::committed(const string * const)
{
if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
}
+
void NullMessageStore::aborted(const string * const)
{
if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl;
}
+
std::auto_ptr<TransactionContext> NullMessageStore::begin()
{
return std::auto_ptr<TransactionContext>();
}
+
void NullMessageStore::commit(TransactionContext*)
{
}
+
void NullMessageStore::abort(TransactionContext*)
{
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Tue Nov 28
07:25:35 2006
@@ -35,18 +35,20 @@
const bool warn;
public:
NullMessageStore(bool warn = true);
- void virtual create(const Queue& queue);
- void virtual destroy(const Queue& queue);
- void virtual recover(RecoveryManager& queues);
- void virtual stage(Message::shared_ptr& msg);
- void virtual destroy(Message::shared_ptr& msg);
- void virtual enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- void virtual dequeue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- void virtual committed(const string * const xid);
- void virtual aborted(const string * const xid);
+ virtual void create(const Queue& queue);
+ virtual void destroy(const Queue& queue);
+ virtual void recover(RecoveryManager& queues);
+ virtual void stage(Message::shared_ptr& msg);
+ virtual void destroy(Message::shared_ptr& msg);
+ virtual void appendContent(u_int64_t msgId, const std::string&
data);
+ virtual void loadContent(u_int64_t msgId, std::string& data,
u_int64_t offset, u_int32_t length);
+ virtual void enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void dequeue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void committed(const string * const xid);
+ virtual void aborted(const string * const xid);
virtual std::auto_ptr<TransactionContext> begin();
- void virtual commit(TransactionContext* ctxt);
- void virtual abort(TransactionContext* ctxt);
+ virtual void commit(TransactionContext* ctxt);
+ virtual void abort(TransactionContext* ctxt);
~NullMessageStore(){}
};
}
Added:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp?view=auto&rev=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
(added)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
Tue Nov 28 07:25:35 2006
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/broker/InMemoryContent.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+
+using std::list;
+using std::string;
+using boost::dynamic_pointer_cast;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
+ }
+};
+
+class InMemoryContentTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(InMemoryContentTest);
+ CPPUNIT_TEST(testRefragmentation);
+ CPPUNIT_TEST_SUITE_END();
+
+public:
+ void testRefragmentation()
+ {
+ {//no remainder
+ string out[] = {"abcde", "fghij", "klmno", "pqrst"};
+ string in[] = {out[0] + out[1], out[2] + out[3]};
+ refragment(2, in, 4, out);
+ }
+ {//remainder for last frame
+ string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvw"};
+ string in[] = {out[0] + out[1], out[2] + out[3] + out[4]};
+ refragment(2, in, 5, out);
+ }
+ }
+
+
+ void refragment(size_t inCount, string* in, size_t outCount, string* out,
u_int32_t framesize = 5)
+ {
+ InMemoryContent content;
+ DummyHandler handler;
+ u_int16_t channel = 3;
+
+ addframes(content, inCount, in);
+ content.send(&handler, channel, framesize);
+ check(handler, channel, outCount, out);
+ }
+
+ void addframes(InMemoryContent& content, size_t frameCount, string*
frameData)
+ {
+ for (unsigned int i = 0; i < frameCount; i++) {
+ AMQContentBody::shared_ptr frame(new AMQContentBody(frameData[i]));
+ content.add(frame);
+ }
+ }
+
+ void check(DummyHandler& handler, u_int16_t channel, size_t
expectedChunkCount, string* expectedChunks)
+ {
+ CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+
+ for (unsigned int i = 0; i < expectedChunkCount; i++) {
+ AMQContentBody::shared_ptr
chunk(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[i]->getBody()));
+ CPPUNIT_ASSERT(chunk);
+ CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(InMemoryContentTest);
+
Propchange:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/InMemoryContentTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp?view=auto&rev=480087
==============================================================================
---
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
(added)
+++
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
Tue Nov 28 07:25:35 2006
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <qpid/broker/LazyLoadedContent.h>
+#include <qpid/broker/NullMessageStore.h>
+#include <qpid_test_plugin.h>
+#include <iostream>
+#include <list>
+#include <sstream>
+
+using std::list;
+using std::string;
+using boost::dynamic_pointer_cast;
+using namespace qpid::broker;
+using namespace qpid::framing;
+
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
+ }
+};
+
+
+class LazyLoadedContentTest : public CppUnit::TestCase
+{
+ CPPUNIT_TEST_SUITE(LazyLoadedContentTest);
+ CPPUNIT_TEST(testFragmented);
+ CPPUNIT_TEST(testWhole);
+ CPPUNIT_TEST(testHalved);
+ CPPUNIT_TEST_SUITE_END();
+
+ class TestMessageStore : public NullMessageStore
+ {
+ const string content;
+
+ public:
+ TestMessageStore(const string& _content) : content(_content) {}
+
+ void loadContent(u_int64_t, string& data, u_int64_t offset, u_int32_t
length)
+ {
+ if (offset + length <= content.size()) {
+ data = content.substr(offset, length);
+ } else{
+ std::stringstream error;
+ error << "Invalid segment: offset=" << offset << ", length="
<< length << ", content_length=" << content.size();
+ throw qpid::Exception(error.str());
+ }
+ }
+ };
+
+
+public:
+ void testFragmented()
+ {
+ string data = "abcdefghijklmnopqrstuvwxyz";
+ u_int32_t framesize = 5;
+ string out[] = {"abcde", "fghij", "klmno", "pqrst", "uvwxy", "z"};
+ load(data, 6, out, framesize);
+ }
+
+ void testWhole()
+ {
+ string data = "abcdefghijklmnopqrstuvwxyz";
+ u_int32_t framesize = 50;
+ string out[] = {data};
+ load(data, 1, out, framesize);
+ }
+
+ void testHalved()
+ {
+ string data = "abcdefghijklmnopqrstuvwxyz";
+ u_int32_t framesize = 13;
+ string out[] = {"abcdefghijklm", "nopqrstuvwxyz"};
+ load(data, 2, out, framesize);
+ }
+
+ void load(string& in, size_t outCount, string* out, u_int32_t framesize)
+ {
+ TestMessageStore store(in);
+ LazyLoadedContent content(&store, 1, in.size());
+ DummyHandler handler;
+ u_int16_t channel = 3;
+ content.send(&handler, channel, framesize);
+ check(handler, channel, outCount, out);
+ }
+
+ void check(DummyHandler& handler, u_int16_t channel, size_t
expectedChunkCount, string* expectedChunks)
+ {
+ CPPUNIT_ASSERT_EQUAL(expectedChunkCount, handler.frames.size());
+
+ for (unsigned int i = 0; i < expectedChunkCount; i++) {
+ AMQContentBody::shared_ptr
chunk(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[i]->getBody()));
+ CPPUNIT_ASSERT(chunk);
+ CPPUNIT_ASSERT_EQUAL(expectedChunks[i], chunk->getData());
+ CPPUNIT_ASSERT_EQUAL(channel, handler.frames[i]->getChannel());
+ }
+ }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(LazyLoadedContentTest);
+
Propchange:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/LazyLoadedContentTest.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageBuilderTest.cpp
Tue Nov 28 07:25:35 2006
@@ -56,13 +56,19 @@
header = new Buffer(msg->encodedHeaderSize());
msg->encodeHeader(*header);
content = new Buffer(contentBufferSize);
- msg->encodeContent(*content);
- } else if (!header || !content) {
- throw qpid::Exception("Buffers not initialised!");
+ msg->setPersistenceId(1);
} else {
- msg->encodeContent(*content);
+ throw qpid::Exception("Message already staged!");
+ }
+ }
+
+ void appendContent(u_int64_t msgId, const string& data)
+ {
+ if (msgId == 1) {
+ content->putRawData(data);
+ } else {
+ throw qpid::Exception("Invalid message id!");
}
- msg->setPersistenceId(1);
}
Message::shared_ptr getRestoredMessage()
@@ -159,7 +165,7 @@
void testStaging(){
DummyHandler handler;
- TestMessageStore store(50);//more than enough for two frames of 14
bytes
+ TestMessageStore store(14);
MessageBuilder builder(&handler, &store, 5);
string data1("abcdefg");
Modified: incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp?view=diff&rev=480087&r1=480086&r2=480087
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/test/unit/qpid/broker/MessageTest.cpp Tue Nov
28 07:25:35 2006
@@ -77,13 +77,10 @@
DummyHandler handler;
msg->deliver(&handler, 0, "ignore", 0, 100);
- CPPUNIT_ASSERT_EQUAL((size_t) 4, handler.frames.size());
- AMQContentBody::shared_ptr
contentBody1(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[2]->getBody()));
- AMQContentBody::shared_ptr
contentBody2(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[3]->getBody()));
- CPPUNIT_ASSERT(contentBody1);
- CPPUNIT_ASSERT(contentBody2);
- CPPUNIT_ASSERT_EQUAL(data1, contentBody1->getData());
- CPPUNIT_ASSERT_EQUAL(data2, contentBody2->getData());
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ AMQContentBody::shared_ptr
contentBody(dynamic_pointer_cast<AMQContentBody,
AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(data1 + data2, contentBody->getData());
}
};