Author: gsim
Date: Fri Aug 31 09:45:20 2007
New Revision: 571518

URL: http://svn.apache.org/viewvc?rev=571518&view=rev
Log:
Pass QueuedMessage to queues consumers. This records the position of that 
message in the queue which is need to handle rlease and acquire.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Aug 31 
09:45:20 2007
@@ -218,31 +218,33 @@
                                     const string& _name, 
                                     Queue::shared_ptr _queue, 
                                     bool ack,
-                                    bool _nolocal 
+                                    bool _nolocal,
+                                    bool _acquire
                                     ) : parent(_parent), 
                                         token(_token), 
                                         name(_name), 
                                         queue(_queue), 
                                         ackExpected(ack), 
                                         nolocal(_nolocal), 
+                                        acquire(_acquire),
                                         blocked(false), 
                                         windowing(true), 
                                         msgCredit(0xFFFFFFFF), 
                                         byteCredit(0xFFFFFFFF) {}
 
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
+bool Channel::ConsumerImpl::deliver(QueuedMessage& msg)
 {
-    if (nolocal && &(parent->connection) == msg->getPublisher()) {
+    if (nolocal && &(parent->connection) == msg.payload->getPublisher()) {
         return false;
     } else {
-        if (!checkCredit(msg) || !parent->flowActive || (ackExpected && 
!parent->checkPrefetch(msg))) {
+        if (!checkCredit(msg.payload) || !parent->flowActive || (ackExpected 
&& !parent->checkPrefetch(msg.payload))) {
             blocked = true;
         } else {
             blocked = false;
 
             Mutex::ScopedLock locker(parent->deliveryLock);
 
-            DeliveryId deliveryTag = parent->out.deliver(msg, token);
+            DeliveryId deliveryTag = parent->out.deliver(msg.payload, token);
             if (ackExpected) {
                 parent->record(DeliveryRecord(msg, queue, name, deliveryTag));
             }
@@ -409,10 +411,10 @@
 
 bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, 
bool ackExpected)
 {
-    Message::shared_ptr msg = queue->dequeue();
-    if(msg){
+    QueuedMessage msg = queue->dequeue();
+    if(msg.payload){
         Mutex::ScopedLock locker(deliveryLock);
-        DeliveryId myDeliveryTag = out.deliver(msg, token);
+        DeliveryId myDeliveryTag = out.deliver(msg.payload, token);
         if(ackExpected){
             unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Aug 31 
09:45:20 2007
@@ -69,17 +69,20 @@
         const Queue::shared_ptr queue;
         const bool ackExpected;
         const bool nolocal;
+        const bool acquire;
         bool blocked;
         bool windowing;
-        uint32_t msgCredit;
-       
+        uint32_t msgCredit;    
         uint32_t byteCredit;
 
+        bool checkCredit(Message::shared_ptr& msg);
+
       public:
         ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token, 
-                     const string& name, Queue::shared_ptr queue, bool ack, 
bool nolocal);
+                     const string& name, Queue::shared_ptr queue, 
+                     bool ack, bool nolocal, bool acquire = true);
         ~ConsumerImpl();
-        bool deliver(Message::shared_ptr& msg);            
+        bool deliver(QueuedMessage& msg);            
         void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag);
         void cancel();
         void requestDispatch();
@@ -90,7 +93,6 @@
         void addMessageCredit(uint32_t value);
         void flush();
         void stop();
-        bool checkCredit(Message::shared_ptr& msg);
         void acknowledged(const DeliveryRecord&);    
     };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Fri Aug 31 
09:45:20 2007
@@ -102,10 +102,10 @@
    
 }
 
-void Queue::requeue(Message::shared_ptr& msg){
+void Queue::requeue(const QueuedMessage& msg){
     {
        Mutex::ScopedLock locker(messageLock);
-       msg->enqueueComplete(); // mark the message as enqueued
+       msg.payload->enqueueComplete(); // mark the message as enqueued
        messages.push_front(msg);
     }
     serializer.execute(dispatchCallback);
@@ -118,7 +118,7 @@
 }
 
 
-bool Queue::dispatch(Message::shared_ptr& msg){
+bool Queue::dispatch(QueuedMessage& msg){
 
  
     RWlock::ScopedWlock locker(consumerLock); /// lock scope to wide....
@@ -144,21 +144,19 @@
 
 
 void Queue::dispatch(){
-
-
-     Message::shared_ptr msg;
+     QueuedMessage msg;
      while(true){
         {
            Mutex::ScopedLock locker(messageLock);
            if (messages.empty()) break; 
            msg = messages.front();
        }
-        if( msg->isEnqueueComplete() && dispatch(msg) ){
+        if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
             pop();
-        }else break;
-       
-    }
-    
+        } else {
+            break;
+        }      
+    }    
 }
 
 void Queue::consume(Consumer* c, bool requestExclusive){
@@ -185,18 +183,16 @@
     if(exclusive == c) exclusive = 0;
 }
 
-Message::shared_ptr Queue::dequeue(){
+QueuedMessage Queue::dequeue(){
     Mutex::ScopedLock locker(messageLock);
-    Message::shared_ptr msg;
+    QueuedMessage msg;
     if(!messages.empty()){
         msg = messages.front();
-       if (msg->isEnqueueComplete()){
+       if (msg.payload->isEnqueueComplete()){
            pop();
-          return msg;
        }
     }
-    Message::shared_ptr msg_empty;
-    return msg_empty;
+    return msg;
 }
 
 uint32_t Queue::purge(){
@@ -208,13 +204,13 @@
 
 void Queue::pop(){
     Mutex::ScopedLock locker(messageLock);
-    if (policy.get()) policy->dequeued(messages.front()->contentSize());
+    if (policy.get()) 
policy->dequeued(messages.front().payload->contentSize());
     messages.pop_front();    
 }
 
 void Queue::push(Message::shared_ptr& msg){
     Mutex::ScopedLock locker(messageLock);
-    messages.push_back(msg);
+    messages.push_back(QueuedMessage(msg, ++sequence));
     if (policy.get()) {
         policy->enqueued(msg->contentSize());
         if (policy->limitExceeded()) {
@@ -229,7 +225,7 @@
   
     uint32_t count =0;
     for ( Messages::const_iterator i = messages.begin(); i != messages.end(); 
++i ) {
-        if ( (*i)->isEnqueueComplete() ) count ++;
+        if ( i->payload->isEnqueueComplete() ) count ++;
     }
     
     return count;
@@ -296,7 +292,7 @@
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
         while(!messages.empty()){
-            DeliverableMessage msg(messages.front());
+            DeliverableMessage msg(messages.front().payload);
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
                                      
&(msg.getMessage().getApplicationHeaders()));
             pop();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Fri Aug 31 
09:45:20 2007
@@ -46,9 +46,6 @@
         class TransactionContext;
         class Exchange;
 
-        /**
-         * Thrown when exclusive access would be violated.
-         */
         using std::string;
 
         /**
@@ -59,7 +56,7 @@
          */
         class Queue : public PersistableQueue{
             typedef std::vector<Consumer*> Consumers;
-            typedef std::deque<Message::shared_ptr> Messages;
+            typedef std::deque<QueuedMessage> Messages;
             
             struct DispatchFunctor {
                 Queue& queue;
@@ -84,10 +81,11 @@
             boost::shared_ptr<Exchange> alternateExchange;
             qpid::sys::Serializer<DispatchFunctor> serializer;
             DispatchFunctor dispatchCallback;
+            framing::SequenceNumber sequence;
 
             void pop();
             void push(Message::shared_ptr& msg);
-            bool dispatch(Message::shared_ptr& msg);
+            bool dispatch(QueuedMessage& msg);
             void setPolicy(std::auto_ptr<QueuePolicy> policy);
             /**
              * only called by serilizer
@@ -132,7 +130,7 @@
              * available it will be dispatched immediately, else it
              * will be returned to the front of the queue.
              */
-            void requeue(Message::shared_ptr& msg);
+            void requeue(const QueuedMessage& msg);
             /**
              * Used during recovery to add stored messages back to the queue
              */
@@ -166,7 +164,7 @@
             /**
              * dequeues from memory only
              */
-            Message::shared_ptr dequeue();
+            QueuedMessage dequeue();
 
             const QueuePolicy* const getPolicy();
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Fri Aug 31 
09:45:20 2007
@@ -25,9 +25,20 @@
 
 namespace qpid {
     namespace broker {
+
+        struct QueuedMessage
+        {
+            Message::shared_ptr payload;
+            framing::SequenceNumber position;
+
+            QueuedMessage(Message::shared_ptr msg, framing::SequenceNumber sn) 
: payload(msg), position(sn) {}
+            QueuedMessage() {}
+        };
+        
+
         class Consumer{
         public:
-            virtual bool deliver(Message::shared_ptr& msg) = 0;
+            virtual bool deliver(QueuedMessage& msg) = 0;
             virtual ~Consumer(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.cpp Fri Aug 31 
09:45:20 2007
@@ -24,26 +24,28 @@
 using namespace qpid::broker;
 using std::string;
 
-DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, 
+DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
                                const string _consumerTag, 
                                const DeliveryId _deliveryTag) : msg(_msg), 
-                                                               queue(_queue), 
-                                                               
consumerTag(_consumerTag),
-                                                               
deliveryTag(_deliveryTag),
-                                                               pull(false){}
+                                                                queue(_queue), 
+                                                                
consumerTag(_consumerTag),
+                                                                
deliveryTag(_deliveryTag),
+                                                                
acquired(false),
+                                                                pull(false){}
 
-DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, 
+DeliveryRecord::DeliveryRecord(QueuedMessage& _msg, 
                                Queue::shared_ptr _queue, 
                                const DeliveryId _deliveryTag) : msg(_msg), 
-                                                               queue(_queue), 
-                                                               consumerTag(""),
-                                                               
deliveryTag(_deliveryTag),
-                                                               pull(true){}
+                                                                queue(_queue), 
+                                                                
consumerTag(""),
+                                                                
deliveryTag(_deliveryTag),
+                                                                
acquired(false),
+                                                                pull(true){}
 
 
 void DeliveryRecord::dequeue(TransactionContext* ctxt) const{
-    queue->dequeue(ctxt, msg);
+    queue->dequeue(ctxt, msg.payload);
 }
 
 bool DeliveryRecord::matches(DeliveryId tag) const{
@@ -67,18 +69,18 @@
         //if message was originally sent as response to get, we must requeue it
         requeue();
     }else{
-        channel->deliver(msg, consumerTag, deliveryTag);
+        channel->deliver(msg.payload, consumerTag, deliveryTag);
     }
 }
 
 void DeliveryRecord::requeue() const{
-    msg->redeliver();
+    msg.payload->redeliver();
     queue->requeue(msg);
 }
 
 void DeliveryRecord::updateByteCredit(uint32_t& credit) const
 {
-    credit += msg->getRequiredCredit();
+    credit += msg.payload->getRequiredCredit();
 }
 
 
@@ -86,7 +88,7 @@
     if(!pull){
         //ignore 'pulled' messages (i.e. those that were sent in
         //response to get) when calculating prefetch
-        prefetch.size += msg->contentSize();
+        prefetch.size += msg.payload->contentSize();
         prefetch.count++;
     }    
 }
@@ -95,7 +97,7 @@
     if(!pull){
         //ignore 'pulled' messages (i.e. those that were sent in
         //response to get) when calculating prefetch
-        prefetch.size -= msg->contentSize();
+        prefetch.size -= msg.payload->contentSize();
         prefetch.count--;
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Fri Aug 31 
09:45:20 2007
@@ -26,6 +26,7 @@
 #include <ostream>
 #include "AccumulatedAck.h"
 #include "BrokerQueue.h"
+#include "Consumer.h"
 #include "DeliveryId.h"
 #include "Message.h"
 #include "Prefetch.h"
@@ -38,15 +39,16 @@
          * Record of a delivery for which an ack is outstanding.
          */
         class DeliveryRecord{
-            mutable Message::shared_ptr msg;
+            mutable QueuedMessage msg;
             mutable Queue::shared_ptr queue;
             const std::string consumerTag;
             const DeliveryId deliveryTag;
-            bool pull;
+            bool acquired;
+            const bool pull;
 
         public:
-            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, 
const std::string consumerTag, const DeliveryId deliveryTag);
-            DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, 
const DeliveryId deliveryTag);
+            DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const 
std::string consumerTag, const DeliveryId deliveryTag);
+            DeliveryRecord(QueuedMessage& msg, Queue::shared_ptr queue, const 
DeliveryId deliveryTag);
             
             void dequeue(TransactionContext* ctxt = 0) const;
             bool matches(DeliveryId tag) const;
@@ -60,6 +62,8 @@
             void subtractFrom(Prefetch&) const;
             const std::string& getConsumerTag() const { return consumerTag; } 
             bool isPull() const { return pull; }
+            bool isAcquired() const { return acquired; }
+            void setAcquired(bool isAcquired) { acquired = isAcquired; }
             
             friend std::ostream& operator<<(std::ostream&, const 
DeliveryRecord&);
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Fri Aug 31 
09:45:20 2007
@@ -256,13 +256,13 @@
         queue->deliver(msg3);
        sleep(2);
         
-        Message::shared_ptr next = queue->dequeue();
+        Message::shared_ptr next = queue->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg1, next);
         CPPUNIT_ASSERT_EQUAL((uint32_t) data1.size(), 
next->encodedContentSize());
-        next = queue->dequeue();
+        next = queue->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg2, next);
         CPPUNIT_ASSERT_EQUAL((uint32_t) data2.size(), 
next->encodedContentSize());
-        next = queue->dequeue();
+        next = queue->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg3, next);
         CPPUNIT_ASSERT_EQUAL((uint32_t) 0, next->encodedContentSize());
 
@@ -295,11 +295,11 @@
         queue3->deliver(msg1);
        sleep(2);
         
-        Message::shared_ptr next = queue1->dequeue();
+        Message::shared_ptr next = queue1->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg1, next);
-        next = queue2->dequeue();
+        next = queue2->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg1, next);
-        next = queue3->dequeue();
+        next = queue3->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg1, next);
 
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Fri Aug 31 09:45:20 
2007
@@ -40,10 +40,10 @@
     bool received;
     TestConsumer(): received(false) {};
 
-    virtual bool deliver(Message::shared_ptr& msg){
-    last = msg;
-    received = true;
-    return true;
+    virtual bool deliver(QueuedMessage& msg){
+        last = msg.payload;
+        received = true;
+        return true;
     };
 };
 
@@ -97,7 +97,7 @@
         CPPUNIT_ASSERT(!c1.received);
        msg1->enqueueComplete();
 
-        received = queue->dequeue();
+        received = queue->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
        
    
@@ -190,11 +190,11 @@
 
         CPPUNIT_ASSERT_EQUAL(uint32_t(3), queue->getMessageCount());
         
-        received = queue->dequeue();
+        received = queue->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getMessageCount());
 
-        received = queue->dequeue();
+        received = queue->dequeue().payload;
         CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
 
@@ -207,7 +207,7 @@
         CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
 
-        received = queue->dequeue();
+        received = queue->dequeue().payload;
         CPPUNIT_ASSERT(!received);
         CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
         

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxAckTest.cpp Fri Aug 31 09:45:20 
2007
@@ -76,7 +76,9 @@
             
msg->getProperties<DeliveryProperties>()->setDeliveryMode(PERSISTENT);
             
msg->getProperties<DeliveryProperties>()->setRoutingKey("routing_key");
             messages.push_back(msg);
-            deliveries.push_back(DeliveryRecord(msg, queue, "xyz", (i+1)));
+            QueuedMessage qm;
+            qm.payload = msg;
+            deliveries.push_back(DeliveryRecord(qm, queue, "xyz", (i+1)));
         }
 
         //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not)

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp?rev=571518&r1=571517&r2=571518&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxPublishTest.cpp Fri Aug 31 
09:45:20 2007
@@ -99,13 +99,13 @@
         op.prepare(0);
         op.commit();
         CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount());
-       Message::shared_ptr msg_dequeue = queue1->dequeue();
+       Message::shared_ptr msg_dequeue = queue1->dequeue().payload;
 
        CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) 
msg_dequeue.get())->isEnqueueComplete());
         CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue);
 
         CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue2->getMessageCount());
-        CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue());            
+        CPPUNIT_ASSERT_EQUAL(msg, queue2->dequeue().payload);            
     }
 };
 


Reply via email to