Author: kpvdr
Date: Mon Nov 26 13:48:37 2007
New Revision: 598440

URL: http://svn.apache.org/viewvc?rev=598440&view=rev
Log:
Switched all regular PersistentMessage* and PersistentMessage& to 
intrusive_ptr<PersistentMessage>, so as to hook into the refcount for a message 
while it is in the store.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Mon Nov 26 
13:48:37 2007
@@ -149,7 +149,8 @@
                store = _store;
        }
     if (!getPersistenceId()) {
-        store->stage(*this);
+        intrusive_ptr<PersistableMessage> pmsg(this);
+        store->stage(pmsg);
     }
     //remove any content frames from the frameset
     frames.remove(TypeFilter<CONTENT_BODY>());
@@ -162,13 +163,14 @@
         //load content from store in chunks of maxContentSize
         uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
         uint64_t expectedSize(frames.getHeaders()->getContentLength());
+        intrusive_ptr<const PersistableMessage> pmsg(this);
         for (uint64_t offset = 0; offset < expectedSize; offset += 
maxContentSize)
         {            
             uint64_t remaining = expectedSize - offset;
             AMQFrame frame(in_place<AMQContentBody>());
             string& data = frame.castBody<AMQContentBody>()->getData();
 
-            store->loadContent(queue, *this, data, offset,
+            store->loadContent(queue, pmsg, data, offset,
                                remaining > maxContentSize ? maxContentSize : 
remaining);
             frame.setBof(false);
             frame.setEof(true);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageBuilder.cpp Mon Nov 26 
13:48:37 2007
@@ -49,7 +49,8 @@
         throw CommandInvalidException(QPID_MSG("Invalid frame sequence for 
message (state=" << state << ")"));
     }
     if (staging) {
-        store->appendContent(*message, 
frame.castBody<AMQContentBody>()->getData());
+        intrusive_ptr<const PersistableMessage> cpmsg = 
boost::static_pointer_cast<const PersistableMessage>(message);
+        store->appendContent(cpmsg, 
frame.castBody<AMQContentBody>()->getData());
     } else {
         message->getFrames().append(frame);
         //have we reached the staging limit? if so stage message and release 
content

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStore.h Mon Nov 26 
13:48:37 2007
@@ -21,6 +21,7 @@
 #ifndef _MessageStore_
 #define _MessageStore_
 
+#include <boost/shared_ptr.hpp>
 #include "PersistableExchange.h"
 #include "PersistableMessage.h"
 #include "PersistableQueue.h"
@@ -94,7 +95,7 @@
         * for that queue and avoid searching based on id. Set queue = 0 for
         * large message staging when the queue is not known.
      */
-    virtual void stage( PersistableMessage& msg) = 0;
+    virtual void stage(intrusive_ptr<PersistableMessage>& msg) = 0;
             
     /**
      * Destroys a previously staged message. This only needs
@@ -102,12 +103,13 @@
      * enqueued, deletion will be automatic when the message
      * is dequeued from all queues it was enqueued onto).
      */
-    virtual void destroy(PersistableMessage& msg) = 0;
+    virtual void destroy(intrusive_ptr<PersistableMessage>& msg) = 0;
 
     /**
      * Appends content to a previously staged message
      */
-    virtual void appendContent(const PersistableMessage& msg, const 
std::string& data) = 0;
+    virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg,
+                               const std::string& data) = 0;
     
     /**
      * Loads (a section) of content data for the specified
@@ -118,7 +120,8 @@
      * meta-data).
      */
     virtual void loadContent(const qpid::broker::PersistableQueue& queue, 
-                                const PersistableMessage& msg, std::string& 
data, uint64_t offset, uint32_t length) = 0;
+                                intrusive_ptr<const PersistableMessage>& msg,
+                             std::string& data, uint64_t offset, uint32_t 
length) = 0;
     
     /**
      * Enqueues a message, storing the message if it has not
@@ -134,7 +137,8 @@
      * distributed transaction in which the operation takes
      * place or null for 'local' transactions
      */
-    virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue) = 0;
+    virtual void enqueue(TransactionContext* ctxt, 
intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue) = 0;
     
     /**
      * Dequeues a message, recording that the given message is
@@ -150,7 +154,8 @@
      * distributed transaction in which the operation takes
      * place or null for 'local' transactions
      */
-    virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue) = 0;
+    virtual void dequeue(TransactionContext* ctxt, 
intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue) = 0;
 
     /**
      * Flushes all async messages to disk for the specified queue

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.cpp Mon 
Nov 26 13:48:37 2007
@@ -73,33 +73,33 @@
     TRANSFER_EXCEPTION(store->recover(registry));
 }
 
-void MessageStoreModule::stage( PersistableMessage& msg)
+void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg)
 {
     TRANSFER_EXCEPTION(store->stage(msg));
 }
 
-void MessageStoreModule::destroy(PersistableMessage& msg)
+void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg)
 {
     TRANSFER_EXCEPTION(store->destroy(msg));
 }
 
-void MessageStoreModule::appendContent(const PersistableMessage& msg, const 
std::string& data)
+void MessageStoreModule::appendContent(intrusive_ptr<const 
PersistableMessage>& msg, const std::string& data)
 {
     TRANSFER_EXCEPTION(store->appendContent(msg, data));
 }
 
 void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& 
queue, 
-     const PersistableMessage& msg, string& data, uint64_t offset, uint32_t 
length)
+     intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t 
offset, uint32_t length)
 {
     TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length));
 }
 
-void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& 
msg, const PersistableQueue& queue)
+void MessageStoreModule::enqueue(TransactionContext* ctxt, 
intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
 {
     TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue));
 }
 
-void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& 
msg, const PersistableQueue& queue)
+void MessageStoreModule::dequeue(TransactionContext* ctxt, 
intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
 {
     TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue));
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageStoreModule.h Mon Nov 
26 13:48:37 2007
@@ -55,14 +55,17 @@
     void unbind(const PersistableExchange& exchange, const PersistableQueue& 
queue, 
                 const std::string& key, const framing::FieldTable& args);
     void recover(RecoveryManager& queues);
-    void stage(PersistableMessage& msg);
-    void destroy(PersistableMessage& msg);
-    void appendContent(const PersistableMessage& msg, const std::string& data);
+    void stage(intrusive_ptr<PersistableMessage>& msg);
+    void destroy(intrusive_ptr<PersistableMessage>& msg);
+    void appendContent(intrusive_ptr<const PersistableMessage>& msg, const 
std::string& data);
     void loadContent(const qpid::broker::PersistableQueue& queue, 
-                 const PersistableMessage& msg, std::string& data, uint64_t 
offset, uint32_t length);
+                 intrusive_ptr<const PersistableMessage>& msg, std::string& 
data,
+              uint64_t offset, uint32_t length);
 
-    void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const 
PersistableQueue& queue);
-    void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const 
PersistableQueue& queue);
+    void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& 
msg,
+                 const PersistableQueue& queue);
+    void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& 
msg,
+                 const PersistableQueue& queue);
     u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
     void flush(const qpid::broker::PersistableQueue& queue);
         

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Mon Nov 
26 13:48:37 2007
@@ -79,34 +79,34 @@
     QPID_LOG(info, "Persistence not enabled, no recovery attempted.");
 }
 
-void NullMessageStore::stage(PersistableMessage&)
+void NullMessageStore::stage(intrusive_ptr<PersistableMessage>&)
 {
     QPID_LOG(info, "Can't stage message. Persistence not enabled.");
 }
 
-void NullMessageStore::destroy(PersistableMessage&)
+void NullMessageStore::destroy(intrusive_ptr<PersistableMessage>&)
 {
 }
 
-void NullMessageStore::appendContent(const PersistableMessage&, const string&)
+void NullMessageStore::appendContent(intrusive_ptr<const PersistableMessage>&, 
const string&)
 {
     QPID_LOG(info, "Can't append content. Persistence not enabled.");
 }
 
-void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, 
const PersistableMessage&, string&, uint64_t, uint32_t)
+void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, 
intrusive_ptr<const PersistableMessage>&, string&, uint64_t, uint32_t)
 {
     QPID_LOG(info, "Can't load content. Persistence not enabled.");
 }
 
-void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, 
const PersistableQueue& queue)
+void NullMessageStore::enqueue(TransactionContext*, 
intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue)
 {
-    msg.enqueueComplete(); 
+    msg->enqueueComplete(); 
     QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() 
<< "'. Persistence not enabled.");
 }
 
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, 
const PersistableQueue&)
+void NullMessageStore::dequeue(TransactionContext*, 
intrusive_ptr<PersistableMessage>& msg, const PersistableQueue&)
 {
-    msg.dequeueComplete();
+    msg->dequeueComplete();
 }
 
 void NullMessageStore::flush(const qpid::broker::PersistableQueue&)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.h Mon Nov 26 
13:48:37 2007
@@ -56,13 +56,17 @@
     virtual void unbind(const PersistableExchange& exchange, const 
PersistableQueue& queue, 
                         const std::string& key, const framing::FieldTable& 
args);
     virtual void recover(RecoveryManager& queues);
-    virtual void stage(PersistableMessage& msg);
-    virtual void destroy(PersistableMessage& msg);
-    virtual void appendContent(const PersistableMessage& msg, const 
std::string& data);
-    virtual void loadContent(const qpid::broker::PersistableQueue& queue, 
-                          const PersistableMessage& msg, std::string& data, 
uint64_t offset, uint32_t length);
-    virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue);
-    virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, 
const PersistableQueue& queue);
+    virtual void stage(intrusive_ptr<PersistableMessage>& msg);
+    virtual void destroy(intrusive_ptr<PersistableMessage>& msg);
+    virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg,
+                               const std::string& data);
+    virtual void loadContent(const qpid::broker::PersistableQueue& queue,
+                             intrusive_ptr<const PersistableMessage>& msg, 
std::string& data,
+                             uint64_t offset, uint32_t length);
+    virtual void enqueue(TransactionContext* ctxt, 
intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue);
+    virtual void dequeue(TransactionContext* ctxt, 
intrusive_ptr<PersistableMessage>& msg,
+                         const PersistableQueue& queue);
     virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
     virtual void flush(const qpid::broker::PersistableQueue& queue);
     ~NullMessageStore(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Nov 26 13:48:37 
2007
@@ -431,9 +431,11 @@
 {
     if (msg->isPersistent() && store) {
         msg->enqueueAsync(this, store); //increment to async counter -- for 
message sent to more than one queue
-        store->enqueue(ctxt, *msg.get(), *this);
+        intrusive_ptr<PersistableMessage> pmsg = 
static_pointer_cast<PersistableMessage>(msg);
+        store->enqueue(ctxt, pmsg, *this);
         return true;
     }
+    //msg->enqueueAsync();   // increments intrusive ptr cnt
     return false;
 }
 
@@ -442,9 +444,11 @@
 {
     if (msg->isPersistent() && store) {
         msg->dequeueAsync(this, store); //increment to async counter -- for 
message sent to more than one queue
-        store->dequeue(ctxt, *msg.get(), *this);
+        intrusive_ptr<PersistableMessage> pmsg = 
static_pointer_cast<PersistableMessage>(msg);
+        store->dequeue(ctxt, pmsg, *this);
         return true;
     }
+    //msg->dequeueAsync();   // decrements intrusive ptr cnt
     return false;
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/MessageBuilderTest.cpp Mon Nov 26 
13:48:37 2007
@@ -38,7 +38,7 @@
         enum Op {STAGE=1, APPEND=2};
 
         uint64_t id;
-        PersistableMessage* expectedMsg;        
+        intrusive_ptr<PersistableMessage> expectedMsg;        
         string expectedData;
         std::list<Op> ops;
         
@@ -64,17 +64,17 @@
             ops.push_back(APPEND); 
         }
 
-        void stage(PersistableMessage& msg)
+        void stage(intrusive_ptr<PersistableMessage>& msg)
         {
             checkExpectation(STAGE);
-            CPPUNIT_ASSERT_EQUAL(expectedMsg, &msg);
-            msg.setPersistenceId(++id);
+            CPPUNIT_ASSERT_EQUAL(expectedMsg, msg);
+            msg->setPersistenceId(++id);
         }
 
-        void appendContent(const PersistableMessage& msg, const string& data)
+        void appendContent(intrusive_ptr<const PersistableMessage>& msg, const 
string& data)
         {
             checkExpectation(APPEND);
-            CPPUNIT_ASSERT_EQUAL((const PersistableMessage*) expectedMsg, 
&msg);
+            CPPUNIT_ASSERT_EQUAL(static_pointer_cast<const 
PersistableMessage>(expectedMsg), msg);
             CPPUNIT_ASSERT_EQUAL(expectedData, data);            
         }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp Mon Nov 26 13:48:37 
2007
@@ -39,11 +39,11 @@
     class TestMessageStore : public NullMessageStore
     {
     public:
-        vector<PersistableMessage*> dequeued;
+        vector<intrusive_ptr<PersistableMessage> > dequeued;
 
-        void dequeue(TransactionContext*, PersistableMessage& msg, const 
PersistableQueue& /*queue*/)
+        void dequeue(TransactionContext*, intrusive_ptr<PersistableMessage>& 
msg, const PersistableQueue& /*queue*/)
         {
-            dequeued.push_back(&msg);
+            dequeued.push_back(msg);
         }
 
         TestMessageStore() : NullMessageStore() {}
@@ -97,7 +97,7 @@
         CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size());
         int dequeued[] = {0, 1, 2, 3, 4, 6, 8};
         for (int i = 0; i < 7; i++) {
-            CPPUNIT_ASSERT_EQUAL((PersistableMessage*) 
messages[dequeued[i]].get(), store.dequeued[i]);
+            
CPPUNIT_ASSERT_EQUAL(static_pointer_cast<PersistableMessage>(messages[dequeued[i]]),
 store.dequeued[i]);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?rev=598440&r1=598439&r2=598440&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Mon Nov 26 
13:48:37 2007
@@ -36,17 +36,17 @@
 
 class TxPublishTest : public CppUnit::TestCase  
 {
-    typedef std::pair<string, PersistableMessage*> msg_queue_pair;
+    typedef std::pair<string, intrusive_ptr<PersistableMessage> > 
msg_queue_pair;
 
     class TestMessageStore : public NullMessageStore
     {
     public:
         vector<msg_queue_pair> enqueued;
         
-        void enqueue(TransactionContext*, PersistableMessage& msg, const 
PersistableQueue& queue)
+        void enqueue(TransactionContext*, intrusive_ptr<PersistableMessage>& 
msg, const PersistableQueue& queue)
         {
-            msg.enqueueComplete(); 
-           enqueued.push_back(msg_queue_pair(queue.getName(), &msg));
+            msg->enqueueComplete(); 
+               enqueued.push_back(msg_queue_pair(queue.getName(), msg));
         }
         
         //dont care about any of the other methods:
@@ -81,16 +81,15 @@
 
     void testPrepare()
     {
+        intrusive_ptr<PersistableMessage> pmsg = 
static_pointer_cast<PersistableMessage>(msg);
         //ensure messages are enqueued in store
         op.prepare(0);
         CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size());
         CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first);
-        CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), 
store.enqueued[0].second);
+        CPPUNIT_ASSERT_EQUAL(pmsg, store.enqueued[0].second);
         CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first);
-        CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), 
store.enqueued[1].second);
-       CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) 
msg.get())->isEnqueueComplete());
-       
-
+        CPPUNIT_ASSERT_EQUAL(pmsg, store.enqueued[1].second);
+           CPPUNIT_ASSERT_EQUAL( true, ( 
static_pointer_cast<PersistableMessage>(msg))->isEnqueueComplete());
     }
 
     void testCommit()
@@ -101,7 +100,7 @@
         CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount());
        intrusive_ptr<Message> msg_dequeue = queue1->dequeue().payload;
 
-       CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) 
msg_dequeue.get())->isEnqueueComplete());
+       CPPUNIT_ASSERT_EQUAL( true, 
(static_pointer_cast<PersistableMessage>(msg_dequeue))->isEnqueueComplete());
         CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue);
 
         CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount());


Reply via email to