Author: gsim
Date: Wed Dec 6 09:51:42 2006
New Revision: 483165
URL: http://svn.apache.org/viewvc?view=rev&rev=483165
Log:
Allow non-durable messages to be lazy-loaded. Cleanup of lazy-loaded messages
that are never enqueued.
Modified:
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h
incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h
incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerMessage.cpp Wed Dec 6
09:51:42 2006
@@ -53,7 +53,9 @@
Message::Message() : publisher(0), mandatory(false), immediate(false),
redelivered(false), size(0), persistenceId(0){}
-Message::~Message(){}
+Message::~Message(){
+ if (content.get()) content->destroy();
+}
void Message::setHeader(AMQHeaderBody::shared_ptr _header){
this->header = _header;
@@ -205,6 +207,9 @@
void Message::releaseContent(MessageStore* store)
{
Mutex::ScopedLock locker(contentLock);
+ if (!isPersistent() && persistenceId == 0) {
+ store->stage(this);
+ }
if (!content.get() || content->size() > 0) {
//set content to lazy loading mode (but only if there is stored
content):
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/BrokerQueue.cpp Wed Dec 6
09:51:42 2006
@@ -189,14 +189,14 @@
void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const
string * const xid)
{
if (msg->isPersistent() && store) {
- store->enqueue(ctxt, msg, *this, xid);
+ store->enqueue(ctxt, msg.get(), *this, xid);
}
}
void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const
string * const xid)
{
if (msg->isPersistent() && store) {
- store->dequeue(ctxt, msg, *this, xid);
+ store->dequeue(ctxt, msg.get(), *this, xid);
}
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/Content.h Wed Dec 6 09:51:42 2006
@@ -33,6 +33,7 @@
virtual u_int32_t size() = 0;
virtual void send(qpid::framing::OutputHandler* out, int channel,
u_int32_t framesize) = 0;
virtual void encode(qpid::framing::Buffer& buffer) = 0;
+ virtual void destroy() = 0;
virtual ~Content(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.cpp Wed Dec 6
09:51:42 2006
@@ -67,3 +67,7 @@
(*i)->encode(buffer);
}
}
+
+void InMemoryContent::destroy()
+{
+}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/InMemoryContent.h Wed Dec 6
09:51:42 2006
@@ -36,6 +36,7 @@
u_int32_t size();
void send(qpid::framing::OutputHandler* out, int channel,
u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
+ void destroy();
~InMemoryContent(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.cpp Wed Dec 6
09:51:42 2006
@@ -56,3 +56,8 @@
{
//do nothing as all content is written as soon as it is added
}
+
+void LazyLoadedContent::destroy()
+{
+ store->destroy(msg);
+}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/LazyLoadedContent.h Wed Dec 6
09:51:42 2006
@@ -36,6 +36,7 @@
u_int32_t size();
void send(qpid::framing::OutputHandler* out, int channel,
u_int32_t framesize);
void encode(qpid::framing::Buffer& buffer);
+ void destroy();
~LazyLoadedContent(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageBuilder.cpp Wed Dec 6
09:51:42 2006
@@ -53,7 +53,7 @@
}
message->setHeader(header);
if (stagingThreshold && header->getContentSize() >= stagingThreshold) {
- store->stage(message);
+ store->stage(message.get());
message->releaseContent(store);
} else {
auto_ptr<Content> content(new InMemoryContent());
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStore.h Wed Dec 6 09:51:42
2006
@@ -39,7 +39,9 @@
u_int64_t stagingThreshold;
};
/**
- * An abstraction of the persistent storage for messages.
+ * An abstraction of the persistent storage for messages. (In
+ * all methods, any pointers/references to queues or messages
+ * are valid only for the duration of the call).
*/
class MessageStore : public TransactionalStore{
public:
@@ -66,7 +68,7 @@
* persistence id will be set on the message which can be
* used to load the content or to append to it.
*/
- virtual void stage(Message::shared_ptr& msg) = 0;
+ virtual void stage(Message* const msg) = 0;
/**
* Destroys a previously staged message. This only needs
@@ -74,7 +76,7 @@
* enqueued, deletion will be automatic when the message
* is dequeued from all queues it was enqueued onto).
*/
- virtual void destroy(Message::shared_ptr& msg) = 0;
+ virtual void destroy(Message* const msg) = 0;
/**
* Appends content to a previously staged message
@@ -102,7 +104,7 @@
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) =
0;
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg,
const Queue& queue, const std::string * const xid) = 0;
/**
* Dequeues a message, recording that the given message is
* no longer on the given queue and deleting the message
@@ -114,7 +116,7 @@
* distributed transaction in which the operation takes
* place or null for 'local' transactions
*/
- virtual void dequeue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const std::string * const xid) =
0;
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg,
const Queue& queue, const std::string * const xid) = 0;
/**
* Treat all enqueue/dequeues where this xid was specified as
being committed.
*/
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.cpp Wed Dec 6
09:51:42 2006
@@ -43,12 +43,12 @@
store->recover(registry, settings);
}
-void MessageStoreModule::stage(Message::shared_ptr& msg)
+void MessageStoreModule::stage(Message* const msg)
{
store->stage(msg);
}
-void MessageStoreModule::destroy(Message::shared_ptr& msg)
+void MessageStoreModule::destroy(Message* const msg)
{
store->destroy(msg);
}
@@ -63,12 +63,12 @@
store->loadContent(msg, data, offset, length);
}
-void MessageStoreModule::enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg,
const Queue& queue, const string * const xid)
{
store->enqueue(ctxt, msg, queue, xid);
}
-void MessageStoreModule::dequeue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid)
+void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg,
const Queue& queue, const string * const xid)
{
store->dequeue(ctxt, msg, queue, xid);
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/MessageStoreModule.h Wed Dec 6
09:51:42 2006
@@ -39,12 +39,12 @@
void create(const Queue& queue, const qpid::framing::FieldTable&
settings);
void destroy(const Queue& queue);
void recover(RecoveryManager& queues, const MessageStoreSettings*
const settings = 0);
- void stage(Message::shared_ptr& msg);
- void destroy(Message::shared_ptr& msg);
+ void stage(Message* const msg);
+ void destroy(Message* const msg);
void appendContent(Message* const msg, const std::string& data);
void loadContent(Message* const msg, std::string& data, u_int64_t
offset, u_int32_t length);
- void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
- void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg,
const Queue& queue, const string * const xid);
+ void enqueue(TransactionContext* ctxt, Message* const msg, const
Queue& queue, const string * const xid);
+ void dequeue(TransactionContext* ctxt, Message* const msg, const
Queue& queue, const string * const xid);
void committed(const string * const xid);
void aborted(const string * const xid);
std::auto_ptr<TransactionContext> begin();
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.cpp Wed Dec 6
09:51:42 2006
@@ -45,12 +45,12 @@
if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of
queues or messages." << std::endl;
}
-void NullMessageStore::stage(Message::shared_ptr&)
+void NullMessageStore::stage(Message* const)
{
if (warn) std::cout << "WARNING: Can't stage message. Persistence not
enabled." << std::endl;
}
-void NullMessageStore::destroy(Message::shared_ptr&)
+void NullMessageStore::destroy(Message* const)
{
if (warn) std::cout << "WARNING: No need to destroy staged message.
Persistence not enabled." << std::endl;
}
@@ -65,12 +65,12 @@
if (warn) std::cout << "WARNING: Can't load content. Persistence not
enabled." << std::endl;
}
-void NullMessageStore::enqueue(TransactionContext*, Message::shared_ptr&,
const Queue& queue, const string * const)
+void NullMessageStore::enqueue(TransactionContext*, Message* const, const
Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't enqueue message onto '" <<
queue.getName() << "'. Persistence not enabled." << std::endl;
}
-void NullMessageStore::dequeue(TransactionContext*, Message::shared_ptr&,
const Queue& queue, const string * const)
+void NullMessageStore::dequeue(TransactionContext*, Message* const, const
Queue& queue, const string * const)
{
if (warn) std::cout << "WARNING: Can't dequeue message from '" <<
queue.getName() << "'. Persistence not enabled." << std::endl;
}
Modified: incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/lib/broker/NullMessageStore.h Wed Dec 6
09:51:42 2006
@@ -38,12 +38,12 @@
virtual void create(const Queue& queue, const
qpid::framing::FieldTable& settings);
virtual void destroy(const Queue& queue);
virtual void recover(RecoveryManager& queues, const
MessageStoreSettings* const settings = 0);
- virtual void stage(Message::shared_ptr& msg);
- virtual void destroy(Message::shared_ptr& msg);
+ virtual void stage(Message* const msg);
+ virtual void destroy(Message* const msg);
virtual void appendContent(Message* const msg, const std::string&
data);
virtual void loadContent(Message* const msg, std::string& data,
u_int64_t offset, u_int32_t length);
- virtual void enqueue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
- virtual void dequeue(TransactionContext* ctxt,
Message::shared_ptr& msg, const Queue& queue, const string * const xid);
+ virtual void enqueue(TransactionContext* ctxt, Message* const msg,
const Queue& queue, const string * const xid);
+ virtual void dequeue(TransactionContext* ctxt, Message* const msg,
const Queue& queue, const string * const xid);
virtual void committed(const string * const xid);
virtual void aborted(const string * const xid);
virtual std::auto_ptr<TransactionContext> begin();
Modified: incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/MessageBuilderTest.cpp Wed Dec 6
09:51:42 2006
@@ -50,7 +50,7 @@
public:
- void stage(Message::shared_ptr& msg)
+ void stage(Message* const msg)
{
if (msg->getPersistenceId() == 0) {
header = new Buffer(msg->encodedHeaderSize());
@@ -71,6 +71,11 @@
}
}
+ void destroy(Message* msg)
+ {
+ CPPUNIT_ASSERT(msg->getPersistenceId());
+ }
+
Message::shared_ptr getRestoredMessage()
{
Message::shared_ptr msg(new Message());
@@ -164,37 +169,42 @@
}
void testStaging(){
- DummyHandler handler;
+ //store must be the last thing to be destroyed or destructor
+ //of Message fails (it uses the store to call destroy if lazy
+ //loaded content is in use)
TestMessageStore store(14);
- MessageBuilder builder(&handler, &store, 5);
-
- string data1("abcdefg");
- string data2("hijklmn");
-
- Message::shared_ptr message(new Message(0, "test", "my_routing_key",
false, false));
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- header->setContentSize(14);
- BasicHeaderProperties* properties =
dynamic_cast<BasicHeaderProperties*>(header->getProperties());
- properties->setMessageId("MyMessage");
- properties->getHeaders().setString("abc", "xyz");
-
- AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
- AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
-
- builder.initialise(message);
- builder.setHeader(header);
- builder.addContent(part1);
- builder.addContent(part2);
- CPPUNIT_ASSERT(handler.msg);
- CPPUNIT_ASSERT_EQUAL(message, handler.msg);
-
- Message::shared_ptr restored = store.getRestoredMessage();
- CPPUNIT_ASSERT_EQUAL(message->getExchange(), restored->getExchange());
- CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(),
restored->getRoutingKey());
- CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(),
restored->getHeaderProperties()->getMessageId());
-
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),
-
restored->getHeaderProperties()->getHeaders().getString("abc"));
- CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize());
+ {
+ DummyHandler handler;
+ MessageBuilder builder(&handler, &store, 5);
+
+ string data1("abcdefg");
+ string data2("hijklmn");
+
+ Message::shared_ptr message(new Message(0, "test",
"my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ BasicHeaderProperties* properties =
dynamic_cast<BasicHeaderProperties*>(header->getProperties());
+ properties->setMessageId("MyMessage");
+ properties->getHeaders().setString("abc", "xyz");
+
+ AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
+ AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
+
+ builder.initialise(message);
+ builder.setHeader(header);
+ builder.addContent(part1);
+ builder.addContent(part2);
+ CPPUNIT_ASSERT(handler.msg);
+ CPPUNIT_ASSERT_EQUAL(message, handler.msg);
+
+ Message::shared_ptr restored = store.getRestoredMessage();
+ CPPUNIT_ASSERT_EQUAL(message->getExchange(),
restored->getExchange());
+ CPPUNIT_ASSERT_EQUAL(message->getRoutingKey(),
restored->getRoutingKey());
+
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getMessageId(),
restored->getHeaderProperties()->getMessageId());
+
CPPUNIT_ASSERT_EQUAL(message->getHeaderProperties()->getHeaders().getString("abc"),
+
restored->getHeaderProperties()->getHeaders().getString("abc"));
+ CPPUNIT_ASSERT_EQUAL((u_int64_t) 14, restored->contentSize());
+ }
}
};
Modified: incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxAckTest.cpp Wed Dec 6 09:51:42 2006
@@ -37,9 +37,9 @@
class TestMessageStore : public NullMessageStore
{
public:
- vector<Message::shared_ptr> dequeued;
+ vector<Message*> dequeued;
- void dequeue(TransactionContext*, Message::shared_ptr& msg, const
Queue& /*queue*/, const string * const /*xid*/)
+ void dequeue(TransactionContext*, Message* const msg, const Queue&
/*queue*/, const string * const /*xid*/)
{
dequeued.push_back(msg);
}
@@ -86,13 +86,13 @@
op.prepare(0);
CPPUNIT_ASSERT_EQUAL((size_t) 7, store.dequeued.size());
CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
- CPPUNIT_ASSERT_EQUAL(messages[0], store.dequeued[0]);//msg 1
- CPPUNIT_ASSERT_EQUAL(messages[1], store.dequeued[1]);//msg 2
- CPPUNIT_ASSERT_EQUAL(messages[2], store.dequeued[2]);//msg 3
- CPPUNIT_ASSERT_EQUAL(messages[3], store.dequeued[3]);//msg 4
- CPPUNIT_ASSERT_EQUAL(messages[4], store.dequeued[4]);//msg 5
- CPPUNIT_ASSERT_EQUAL(messages[6], store.dequeued[5]);//msg 7
- CPPUNIT_ASSERT_EQUAL(messages[8], store.dequeued[6]);//msg 9
+ CPPUNIT_ASSERT_EQUAL(messages[0].get(), store.dequeued[0]);//msg 1
+ CPPUNIT_ASSERT_EQUAL(messages[1].get(), store.dequeued[1]);//msg 2
+ CPPUNIT_ASSERT_EQUAL(messages[2].get(), store.dequeued[2]);//msg 3
+ CPPUNIT_ASSERT_EQUAL(messages[3].get(), store.dequeued[3]);//msg 4
+ CPPUNIT_ASSERT_EQUAL(messages[4].get(), store.dequeued[4]);//msg 5
+ CPPUNIT_ASSERT_EQUAL(messages[6].get(), store.dequeued[5]);//msg 7
+ CPPUNIT_ASSERT_EQUAL(messages[8].get(), store.dequeued[6]);//msg 9
}
void testCommit()
Modified: incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp?view=diff&rev=483165&r1=483164&r2=483165
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/tests/TxPublishTest.cpp Wed Dec 6 09:51:42
2006
@@ -38,11 +38,11 @@
class TestMessageStore : public NullMessageStore
{
public:
- vector< pair<string, Message::shared_ptr> > enqueued;
+ vector< pair<string, Message*> > enqueued;
- void enqueue(TransactionContext*, Message::shared_ptr& msg, const
Queue& queue, const string * const /*xid*/)
+ void enqueue(TransactionContext*, Message* const msg, const Queue&
queue, const string * const /*xid*/)
{
- enqueued.push_back(pair<string,
Message::shared_ptr>(queue.getName(),msg));
+ enqueued.push_back(pair<string, Message*>(queue.getName(),msg));
}
//dont care about any of the other methods:
@@ -59,7 +59,7 @@
TestMessageStore store;
Queue::shared_ptr queue1;
Queue::shared_ptr queue2;
- Message::shared_ptr msg;
+ Message::shared_ptr const msg;
TxPublish op;
@@ -82,9 +82,9 @@
op.prepare(0);
CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
- CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[0].second);
+ CPPUNIT_ASSERT_EQUAL(msg.get(), store.enqueued[0].second);
CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
- CPPUNIT_ASSERT_EQUAL(msg, store.enqueued[1].second);
+ CPPUNIT_ASSERT_EQUAL(msg.get(), store.enqueued[1].second);
}
void testCommit()