Author: gsim
Date: Wed Oct 17 01:59:44 2007
New Revision: 585417

URL: http://svn.apache.org/viewvc?rev=585417&view=rev
Log:
Use shared pointers for consumers (held by queues and sessions) to prevent 
having to hold lock across deliver() while avoiding invocation on stale 
pointers.
Ensure auto-deleted queues are properly cleaned up (i.e. are unbound from 
exchanges) to avoid leaking memory as messages are accumulated in inaccessible 
queues. (some cleanup to follow on this)


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Wed Oct 17 
01:59:44 2007
@@ -78,11 +78,8 @@
         while (!exclusiveQueues.empty()) {
             Queue::shared_ptr q(exclusiveQueues.front());
             q->releaseExclusiveOwnership();
-            if (q->canAutoDelete() && 
-                broker.getQueues().destroyIf(q->getName(), 
boost::bind(boost::mem_fn(&Queue::canAutoDelete), q))) {
-
-                q->unbind(broker.getExchanges(), q);
-                q->destroy();
+            if (q->canAutoDelete()) {
+                Queue::tryAutoDelete(broker, q);
             }
             exclusiveQueues.erase(exclusiveQueues.begin());
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Wed Oct 17 
01:59:44 2007
@@ -39,6 +39,8 @@
         class Consumer {
             const bool acquires;
         public:
+            typedef shared_ptr<Consumer> ptr;            
+
             framing::SequenceNumber position;
 
             Consumer(bool preAcquires = true) : acquires(preAcquires) {}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandlerImpl.cpp Wed 
Oct 17 01:59:44 2007
@@ -17,6 +17,7 @@
  */
 
 #include "qpid/QpidError.h"
+#include "qpid/log/Statement.h"
 #include "MessageHandlerImpl.h"
 #include "qpid/framing/FramingContent.h"
 #include "Connection.h"
@@ -156,7 +157,6 @@
 
 void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, 
u_int32_t value)
 {
-    
     if (unit == 0) {
         //message
         state.addMessageCredit(destination, value);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Oct 17 01:59:44 
2007
@@ -22,6 +22,7 @@
 #include <boost/format.hpp>
 
 #include "qpid/log/Statement.h"
+#include "Broker.h"
 #include "Queue.h"
 #include "Exchange.h"
 #include "DeliverableMessage.h"
@@ -47,7 +48,6 @@
     store(_store),
     owner(_owner), 
     next(0),
-    exclusive(0),
     persistenceId(0),
     serializer(false),
     dispatchCallback(*this)
@@ -80,7 +80,7 @@
        }else {
             push(msg);
        }
-       QPID_LOG(debug, "Message Enqueued: " << msg->getApplicationHeaders());
+       QPID_LOG(debug, "Message " << msg << " enqueued on " << name << "[" << 
this << "]");
         serializer.execute(dispatchCallback);
     }
 }
@@ -124,7 +124,7 @@
     return false;
 }
 
-void Queue::requestDispatch(Consumer* c){
+void Queue::requestDispatch(Consumer::ptr c){
     if (!c || c->preAcquires()) {
         serializer.execute(dispatchCallback);
     } else {
@@ -138,12 +138,12 @@
     serializer.execute(f);
 }
 
-Consumer* Queue::allocate()
+Consumer::ptr Queue::allocate()
 {
     RWlock::ScopedWlock locker(consumerLock);
  
     if(acquirers.empty()){
-        return 0;
+        return Consumer::ptr();
     }else if(exclusive){
         return exclusive;
     }else{
@@ -154,14 +154,16 @@
 
 bool Queue::dispatch(QueuedMessage& msg)
 {
-    Consumer* c = allocate();
-    Consumer* first = c;
+    Consumer::ptr c = allocate();
+    Consumer::ptr first = c;
     while(c){
         if(c->deliver(msg)) {
             return true;            
         } else {
             c = allocate();
-            if (c == first) c = 0;
+            if (c == first) { 
+                break;
+            }
         }
     }
     return false;
@@ -199,7 +201,7 @@
      }
 }
 
-void Queue::serviceBrowser(Consumer* browser)
+void Queue::serviceBrowser(Consumer::ptr browser)
 {
     QueuedMessage msg;
     while (seek(msg, browser->position) && browser->deliver(msg)) {
@@ -219,7 +221,7 @@
     return false;
 }
 
-void Queue::consume(Consumer* c, bool requestExclusive){
+void Queue::consume(Consumer::ptr c, bool requestExclusive){
     RWlock::ScopedWlock locker(consumerLock);
     if(exclusive) {
         throw ChannelException(
@@ -242,17 +244,17 @@
     }
 }
 
-void Queue::cancel(Consumer* c){
+void Queue::cancel(Consumer::ptr c){
     RWlock::ScopedWlock locker(consumerLock);
     if (c->preAcquires()) {
         cancel(c, acquirers);
     } else {
         cancel(c, browsers);
     }
-    if(exclusive == c) exclusive = 0;
+    if(exclusive == c) exclusive.reset();
 }
 
-void Queue::cancel(Consumer* c, Consumers& consumers)
+void Queue::cancel(Consumer::ptr c, Consumers& consumers)
 {
     Consumers::iterator i = std::find(consumers.begin(), consumers.end(), c);
     if (i != consumers.end()) 
@@ -442,3 +444,12 @@
     return alternateExchange;
 }
 
+void Queue::tryAutoDelete(Broker& broker, Queue::shared_ptr queue)
+{
+    if (broker.getQueues().destroyIf(queue->getName(), 
+                                     
boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue))) {
+        queue->unbind(broker.getExchanges(), queue);
+        queue->destroy();
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Oct 17 01:59:44 
2007
@@ -36,11 +36,9 @@
 #include "QueuePolicy.h"
 #include "QueueBindings.h"
 
-// TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to
-// enforce ownership of Consumers.
-
 namespace qpid {
     namespace broker {
+        class Broker;
         class MessageStore;
         class QueueRegistry;
         class TransactionContext;
@@ -61,7 +59,7 @@
          * or more consumers registers.
          */
         class Queue : public PersistableQueue {
-            typedef std::vector<Consumer*> Consumers;
+            typedef std::vector<Consumer::ptr> Consumers;
             typedef std::deque<QueuedMessage> Messages;
             
             struct DispatchFunctor 
@@ -86,7 +84,7 @@
             int next;
             mutable qpid::sys::RWlock consumerLock;
             mutable qpid::sys::Mutex messageLock;
-            Consumer* exclusive;
+            Consumer::ptr exclusive;
             mutable uint64_t persistenceId;
             framing::FieldTable settings;
             std::auto_ptr<QueuePolicy> policy;            
@@ -104,10 +102,10 @@
              * only called by serilizer
             */
             void dispatch();
-            void cancel(Consumer* c, Consumers& set);
+            void cancel(Consumer::ptr c, Consumers& set);
             void serviceAllBrowsers();
-            void serviceBrowser(Consumer* c);
-            Consumer* allocate();
+            void serviceBrowser(Consumer::ptr c);
+            Consumer::ptr allocate();
             bool seek(QueuedMessage& msg, const framing::SequenceNumber& 
position);
  
         protected:
@@ -117,7 +115,6 @@
           virtual void notifyDurableIOComplete();
 
         public:
-            
             typedef boost::shared_ptr<Queue> shared_ptr;
 
             typedef std::vector<shared_ptr> vector;
@@ -162,10 +159,10 @@
              * at any time, so this call schedules the despatch based on
             * the serilizer policy.
              */
-            void requestDispatch(Consumer* c = 0);
+            void requestDispatch(Consumer::ptr c = Consumer::ptr());
             void flush(DispatchCompletion& callback);
-            void consume(Consumer* c, bool exclusive = false);
-            void cancel(Consumer* c);
+            void consume(Consumer::ptr c, bool exclusive = false);
+            void cancel(Consumer::ptr c);
             uint32_t purge();
             uint32_t getMessageCount() const;
             uint32_t getConsumerCount() const;
@@ -202,6 +199,7 @@
             uint32_t encodedSize() const;
 
             static Queue::shared_ptr decode(QueueRegistry& queues, 
framing::Buffer& buffer);
+            static void tryAutoDelete(Broker& broker, Queue::shared_ptr);
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Oct 17 
01:59:44 2007
@@ -69,7 +69,11 @@
 }
 
 SemanticState::~SemanticState() {
-    consumers.clear();
+    //cancel all consumers
+    for (ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); i++) {
+        cancel(i->second);
+    }
+
     if (dtxBuffer.get()) {
         dtxBuffer->fail();
     }
@@ -86,16 +90,15 @@
 {
     if(tagInOut.empty())
         tagInOut = tagGenerator.generate();
-    std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, 
queue, acks, nolocal, acquire));
-    queue->consume(c.get(), exclusive);//may throw exception
-    consumers.insert(tagInOut, c.release());
+    ConsumerImpl::shared_ptr c(new ConsumerImpl(this, token, tagInOut, queue, 
acks, nolocal, acquire));
+    queue->consume(c, exclusive);//may throw exception
+    consumers[tagInOut] = c;
 }
 
 void SemanticState::cancel(const string& tag){
-    // consumers is a ptr_map so erase will delete the consumer
-    // which will call cancel.
     ConsumerImplMap::iterator i = consumers.find(tag);
     if (i != consumers.end()) {
+        cancel(i->second);
         consumers.erase(i); 
         //should cancel all unacked messages for this consumer so that
         //they are not redelivered on recovery
@@ -287,28 +290,19 @@
     }
 }
 
-SemanticState::ConsumerImpl::~ConsumerImpl() {
-    cancel();
-}
+SemanticState::ConsumerImpl::~ConsumerImpl() {}
 
-void SemanticState::ConsumerImpl::cancel()
+void SemanticState::cancel(ConsumerImpl::shared_ptr c)
 {
+    Queue::shared_ptr queue = c->getQueue();
     if(queue) {
-        queue->cancel(this);
+        queue->cancel(c);
         if (queue->canAutoDelete() && !queue->hasExclusiveOwner()) {           
 
-            parent->getSession().getBroker().getQueues().destroyIf(
-                queue->getName(), 
-                boost::bind(boost::mem_fn(&Queue::canAutoDelete), queue));
+            Queue::tryAutoDelete(getSession().getBroker(), queue);
         }
     }
 }
 
-void SemanticState::ConsumerImpl::requestDispatch()
-{
-    if(blocked)
-        queue->requestDispatch(this);
-}
-
 void SemanticState::handle(Message::shared_ptr msg) {
     if (txBuffer.get()) {
         TxPublish* deliverable(new TxPublish(msg));
@@ -389,7 +383,21 @@
     //if the prefetch limit had previously been reached, or credit
     //had expired in windowing mode there may be messages that can
     //be now be delivered
-    for_each(consumers.begin(), consumers.end(), 
boost::bind(&ConsumerImpl::requestDispatch, _1));
+    requestDispatch();
+}
+
+void SemanticState::requestDispatch()
+{    
+    for (ConsumerImplMap::iterator i = consumers.begin(); i != 
consumers.end(); i++) {
+        requestDispatch(i->second);
+    }
+}
+
+void SemanticState::requestDispatch(ConsumerImpl::shared_ptr c)
+{    
+    if(c->isBlocked()) {
+        c->getQueue()->requestDispatch(c);
+    }
 }
 
 void SemanticState::acknowledged(const DeliveryRecord& delivery)
@@ -397,7 +405,7 @@
     delivery.subtractFrom(outstanding);
     ConsumerImplMap::iterator i = consumers.find(delivery.getTag());
     if (i != consumers.end()) {
-        i->acknowledged(delivery);
+        i->second->acknowledged(delivery);
     }
 }
 
@@ -458,52 +466,55 @@
     flowActive = active;
     if (requestDelivery) {
         //there may be messages that can be now be delivered
-        std::for_each(consumers.begin(), consumers.end(), 
boost::bind(&ConsumerImpl::requestDispatch, _1));
+        requestDispatch();
     }
 }
 
 
-SemanticState::ConsumerImpl& SemanticState::find(const std::string& 
destination)
+SemanticState::ConsumerImpl::shared_ptr SemanticState::find(const std::string& 
destination)
 {
     ConsumerImplMap::iterator i = consumers.find(destination);
     if (i == consumers.end()) {
         throw NotFoundException(QPID_MSG("Unknown destination " << 
destination));
     } else {
-        return *i;
+        return i->second;
     }
 }
 
 void SemanticState::setWindowMode(const std::string& destination)
 {
-    find(destination).setWindowMode();
+    find(destination)->setWindowMode();
 }
 
 void SemanticState::setCreditMode(const std::string& destination)
 {
-    find(destination).setCreditMode();
+    find(destination)->setCreditMode();
 }
 
 void SemanticState::addByteCredit(const std::string& destination, uint32_t 
value)
 {
-    find(destination).addByteCredit(value);
+    ConsumerImpl::shared_ptr c = find(destination);
+    c->addByteCredit(value);
+    requestDispatch(c);
 }
 
 
 void SemanticState::addMessageCredit(const std::string& destination, uint32_t 
value)
 {
-    find(destination).addMessageCredit(value);
+    ConsumerImpl::shared_ptr c = find(destination);
+    c->addMessageCredit(value);
+    requestDispatch(c);
 }
 
 void SemanticState::flush(const std::string& destination)
 {
-    ConsumerImpl& c = find(destination);
-    c.flush();
+    find(destination)->flush();
 }
 
 
 void SemanticState::stop(const std::string& destination)
 {
-    find(destination).stop();
+    find(destination)->stop();
 }
 
 void SemanticState::ConsumerImpl::setWindowMode()
@@ -518,24 +529,18 @@
 
 void SemanticState::ConsumerImpl::addByteCredit(uint32_t value)
 {
-    {
-        Mutex::ScopedLock l(lock);
-        if (byteCredit != 0xFFFFFFFF) {
-            byteCredit += value;
-        }
+    Mutex::ScopedLock l(lock);
+    if (byteCredit != 0xFFFFFFFF) {
+        byteCredit += value;
     }
-    requestDispatch();
 }
 
 void SemanticState::ConsumerImpl::addMessageCredit(uint32_t value)
 {
-    {
-        Mutex::ScopedLock l(lock);
-        if (msgCredit != 0xFFFFFFFF) {
-            msgCredit += value;
-        }
+    Mutex::ScopedLock l(lock);
+    if (msgCredit != 0xFFFFFFFF) {
+        msgCredit += value;
     }
-    requestDispatch();
 }
 
 void SemanticState::ConsumerImpl::flush()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Oct 17 
01:59:44 2007
@@ -37,9 +37,8 @@
 #include "qpid/framing/Uuid.h"
 #include "qpid/shared_ptr.h"
 
-#include <boost/ptr_container/ptr_map.hpp>
-
 #include <list>
+#include <map>
 #include <vector>
 
 namespace qpid {
@@ -72,13 +71,13 @@
         bool checkCredit(Message::shared_ptr& msg);
 
       public:
+        typedef shared_ptr<ConsumerImpl> shared_ptr;
+
         ConsumerImpl(SemanticState* parent, DeliveryToken::shared_ptr token, 
                      const string& name, Queue::shared_ptr queue,
                      bool ack, bool nolocal, bool acquire);
         ~ConsumerImpl();
         bool deliver(QueuedMessage& msg);            
-        void cancel();
-        void requestDispatch();
 
         void setWindowMode();
         void setCreditMode();
@@ -87,6 +86,8 @@
         void flush();
         void stop();
         void acknowledged(const DeliveryRecord&);    
+        Queue::shared_ptr getQueue() { return queue; }
+        bool isBlocked() const { return blocked; }
     };
 
     struct FlushCompletion : DispatchCompletion
@@ -100,7 +101,7 @@
         void completed();
     };
 
-    typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap;
+    typedef std::map<std::string,ConsumerImpl::shared_ptr> ConsumerImplMap;
 
     SessionState& session;
     DeliveryAdapter& deliveryAdapter;
@@ -124,10 +125,13 @@
     void record(const DeliveryRecord& delivery);
     bool checkPrefetch(Message::shared_ptr& msg);
     void checkDtxTimeout();
-    ConsumerImpl& find(const std::string& destination);
+    ConsumerImpl::shared_ptr find(const std::string& destination);
     void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
     void acknowledged(const DeliveryRecord&);
     AckRange findRange(DeliveryId first, DeliveryId last);
+    void requestDispatch();
+    void requestDispatch(ConsumerImpl::shared_ptr);
+    void cancel(ConsumerImpl::shared_ptr);
 
   public:
     SemanticState(DeliveryAdapter&, SessionState&);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=585417&r1=585416&r2=585417&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Oct 17 01:59:44 
2007
@@ -36,6 +36,8 @@
 
 class TestConsumer : public virtual Consumer{
 public:
+    typedef shared_ptr<TestConsumer> shared_ptr;            
+
     Message::shared_ptr last;
     bool received;
     TestConsumer(): received(false) {};
@@ -85,8 +87,8 @@
         Queue::shared_ptr queue(new Queue("my_test_queue", true));
         Message::shared_ptr received;
        
-        TestConsumer c1; 
-        queue->consume(&c1);
+        TestConsumer::shared_ptr c1(new TestConsumer()); 
+        queue->consume(c1);
 
        
         //Test basic delivery:
@@ -95,7 +97,7 @@
         queue->process(msg1);
        sleep(2);
 
-        CPPUNIT_ASSERT(!c1.received);
+        CPPUNIT_ASSERT(!c1->received);
        msg1->enqueueComplete();
 
         received = queue->dequeue().payload;
@@ -124,10 +126,10 @@
         Queue::shared_ptr queue(new Queue("my_queue", true));
     
         //Test adding consumers:
-        TestConsumer c1; 
-        TestConsumer c2; 
-        queue->consume(&c1);
-        queue->consume(&c2);
+        TestConsumer::shared_ptr c1(new TestConsumer()); 
+        TestConsumer::shared_ptr c2(new TestConsumer()); 
+        queue->consume(c1);
+        queue->consume(c2);
 
         CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getConsumerCount());
         
@@ -137,25 +139,25 @@
         Message::shared_ptr msg3 = message("e", "C");
 
         queue->deliver(msg1);
-       if (!c1.received)
+       if (!c1->received)
            sleep(2);
-        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1.last.get());
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), c1->last.get());
 
         queue->deliver(msg2);
-       if (!c2.received)
+       if (!c2->received)
            sleep(2);
-        CPPUNIT_ASSERT_EQUAL(msg2.get(), c2.last.get());
+        CPPUNIT_ASSERT_EQUAL(msg2.get(), c2->last.get());
         
-       c1.received = false;
+       c1->received = false;
         queue->deliver(msg3);
-       if (!c1.received)
+       if (!c1->received)
            sleep(2);
-        CPPUNIT_ASSERT_EQUAL(msg3.get(), c1.last.get());        
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), c1->last.get());        
     
         //Test cancellation:
-        queue->cancel(&c1);
+        queue->cancel(c1);
         CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getConsumerCount());
-        queue->cancel(&c2);
+        queue->cancel(c2);
         CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getConsumerCount());
     }
 
@@ -200,13 +202,13 @@
         CPPUNIT_ASSERT_EQUAL(msg2.get(), received.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(1), queue->getMessageCount());
 
-        TestConsumer consumer; 
-        queue->consume(&consumer);
+        TestConsumer::shared_ptr consumer(new TestConsumer()); 
+        queue->consume(consumer);
         queue->requestDispatch();
-       if (!consumer.received)
+       if (!consumer->received)
            sleep(2);
 
-        CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer.last.get());
+        CPPUNIT_ASSERT_EQUAL(msg3.get(), consumer->last.get());
         CPPUNIT_ASSERT_EQUAL(uint32_t(0), queue->getMessageCount());
 
         received = queue->dequeue().payload;


Reply via email to