Author: cctrieloff
Date: Tue Aug 21 12:11:07 2007
New Revision: 568236

URL: http://svn.apache.org/viewvc?rev=568236&view=rev
Log:

- header correction for buffer
- added tests for one msg to multiple queue in async mode
- added counter for async multiple msg


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
    incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Aug 21 
12:11:07 2007
@@ -249,6 +249,7 @@
 bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
 {
     if (msg->isPersistent() && store) {
+       msg->enqueueAsync(); //increment to async counter -- for message sent 
to more than one queue
         store->enqueue(ctxt, *msg.get(), *this);
        return true;
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Tue Aug 
21 12:11:07 2007
@@ -45,6 +45,16 @@
     * stored.
     */
     bool enqueueCompleted;
+ 
+    /**
+    * Counts the number of times the message has been processed
+    * async - thus when it == 0 the broker knows it has ownership
+    * -> an async store can increment this counter if it writes a
+    * copy to each queue, and case use this counter to know when all
+    * the write are complete
+    */
+    int asyncCounter;
+
     /**
     * Needs to be set false on Message construction, then
     * set once the dequeueis complete, it gets set
@@ -64,10 +74,19 @@
     virtual ~PersistableMessage() {};
     PersistableMessage():
     enqueueCompleted(false),
+    asyncCounter(0),
     dequeueCompleted(false){};
     
     inline bool isEnqueueComplete() {return enqueueCompleted;};
-    inline void enqueueComplete() {enqueueCompleted = true;};
+    inline void enqueueComplete() {
+        if (asyncCounter<=1) {
+           asyncCounter =0;
+           enqueueCompleted = true; 
+        }else{
+           asyncCounter--;
+       }
+     };
+    inline void enqueueAsync() {asyncCounter++;};
     inline bool isDequeueComplete() {return dequeueCompleted;};
     inline void dequeueComplete() {dequeueCompleted = true;};
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h Tue Aug 21 
12:11:07 2007
@@ -22,6 +22,7 @@
 #define _StructHelper_
 
 #include "qpid/Exception.h"
+#include "Buffer.h"
 
 namespace qpid {
 namespace framing {

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Tue Aug 21 
12:11:07 2007
@@ -75,6 +75,7 @@
     CPPUNIT_TEST(testStaging);
     CPPUNIT_TEST(testQueuePolicy);
     CPPUNIT_TEST(testFlow);
+    CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue);
     CPPUNIT_TEST_SUITE_END();
 
     shared_ptr<Broker> broker;
@@ -306,6 +307,38 @@
         }
         store.check();
     }
+
+
+    //NOTE: message or queue test,
+    //but as it can usefully use the same utility classes as this
+    //class it is defined here for simpllicity
+    void testAsyncMesgToMoreThanOneQueue()
+    {
+        MockMessageStore store;
+        {//must ensure that store is last thing deleted
+        const string data1("abcd");
+        Message::shared_ptr msg1(createMessage("e", "A", "MsgA", 
data1.size()));
+        addContent(msg1, data1);
+ 
+        Queue::shared_ptr queue1(new Queue("my_queue1", false, &store, 0));
+        Queue::shared_ptr queue2(new Queue("my_queue2", false, &store, 0));
+        Queue::shared_ptr queue3(new Queue("my_queue3", false, &store, 0));
+        queue1->deliver(msg1);
+        queue2->deliver(msg1);
+        queue3->deliver(msg1);
+       sleep(2);
+        
+        Message::shared_ptr next = queue1->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg1, next);
+        next = queue2->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg1, next);
+        next = queue3->dequeue();
+        CPPUNIT_ASSERT_EQUAL(msg1, next);
+
+        }
+    }
+
+
 
     void testFlow(){
         Channel channel(connection, recorder, 7);


Reply via email to