Author: cctrieloff
Date: Wed Aug 15 12:16:46 2007
New Revision: 566306

URL: http://svn.apache.org/viewvc?view=rev&rev=566306
Log:

- async message fix in dequeue
- addition of test for async enqueue of messages on BrokerQueue


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?view=diff&rev=566306&r1=566305&r2=566306
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Wed Aug 15 
12:16:46 2007
@@ -189,7 +189,8 @@
     Message::shared_ptr msg;
     if(!messages.empty()){
         msg = messages.front();
-        pop();
+       if (msg->isEnqueueComplete())
+           pop();
     }
     return msg;
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?view=diff&rev=566306&r1=566305&r2=566306
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Aug 15 12:16:46 
2007
@@ -63,6 +63,7 @@
     CPPUNIT_TEST(testRegistry);
     CPPUNIT_TEST(testDequeue);
     CPPUNIT_TEST(testBound);
+    CPPUNIT_TEST(testAsyncMessage);
     CPPUNIT_TEST_SUITE_END();
 
 
@@ -70,6 +71,30 @@
     Message::shared_ptr message(std::string exchange, std::string routingKey) {
         return Message::shared_ptr(
             new BasicMessage(0, exchange, routingKey, false, false));
+    }
+    
+    
+    void testAsyncMessage(){
+    
+        Queue::shared_ptr queue(new Queue("my_test_queue", true));
+        Message::shared_ptr received;
+       
+        TestConsumer c1; 
+        queue->consume(&c1);
+
+       
+        //Test basic delivery:
+        Message::shared_ptr msg1 = message("e", "A");
+        queue->process(msg1);
+       sleep(2);
+
+        CPPUNIT_ASSERT(!c1.received);
+       msg1->enqueueComplete();
+
+        received = queue->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg1.get(), received.get());
+       
+   
     }
     
     void testConsumers(){


Reply via email to