Author: cctrieloff
Date: Tue Aug 21 12:11:07 2007
New Revision: 568236
URL: http://svn.apache.org/viewvc?rev=568236&view=rev
Log:
- header correction for buffer
- added tests for one msg to multiple queue in async mode
- added counter for async multiple msg
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Aug 21
12:11:07 2007
@@ -249,6 +249,7 @@
bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg)
{
if (msg->isPersistent() && store) {
+ msg->enqueueAsync(); //increment to async counter -- for message sent
to more than one queue
store->enqueue(ctxt, *msg.get(), *this);
return true;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Tue Aug
21 12:11:07 2007
@@ -45,6 +45,16 @@
* stored.
*/
bool enqueueCompleted;
+
+ /**
+ * Counts the number of times the message has been processed
+ * async - thus when it == 0 the broker knows it has ownership
+ * -> an async store can increment this counter if it writes a
+ * copy to each queue, and case use this counter to know when all
+ * the write are complete
+ */
+ int asyncCounter;
+
/**
* Needs to be set false on Message construction, then
* set once the dequeueis complete, it gets set
@@ -64,10 +74,19 @@
virtual ~PersistableMessage() {};
PersistableMessage():
enqueueCompleted(false),
+ asyncCounter(0),
dequeueCompleted(false){};
inline bool isEnqueueComplete() {return enqueueCompleted;};
- inline void enqueueComplete() {enqueueCompleted = true;};
+ inline void enqueueComplete() {
+ if (asyncCounter<=1) {
+ asyncCounter =0;
+ enqueueCompleted = true;
+ }else{
+ asyncCounter--;
+ }
+ };
+ inline void enqueueAsync() {asyncCounter++;};
inline bool isDequeueComplete() {return dequeueCompleted;};
inline void dequeueComplete() {dequeueCompleted = true;};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/StructHelper.h Tue Aug 21
12:11:07 2007
@@ -22,6 +22,7 @@
#define _StructHelper_
#include "qpid/Exception.h"
+#include "Buffer.h"
namespace qpid {
namespace framing {
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp?rev=568236&r1=568235&r2=568236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/BrokerChannelTest.cpp Tue Aug 21
12:11:07 2007
@@ -75,6 +75,7 @@
CPPUNIT_TEST(testStaging);
CPPUNIT_TEST(testQueuePolicy);
CPPUNIT_TEST(testFlow);
+ CPPUNIT_TEST(testAsyncMesgToMoreThanOneQueue);
CPPUNIT_TEST_SUITE_END();
shared_ptr<Broker> broker;
@@ -306,6 +307,38 @@
}
store.check();
}
+
+
+ //NOTE: message or queue test,
+ //but as it can usefully use the same utility classes as this
+ //class it is defined here for simpllicity
+ void testAsyncMesgToMoreThanOneQueue()
+ {
+ MockMessageStore store;
+ {//must ensure that store is last thing deleted
+ const string data1("abcd");
+ Message::shared_ptr msg1(createMessage("e", "A", "MsgA",
data1.size()));
+ addContent(msg1, data1);
+
+ Queue::shared_ptr queue1(new Queue("my_queue1", false, &store, 0));
+ Queue::shared_ptr queue2(new Queue("my_queue2", false, &store, 0));
+ Queue::shared_ptr queue3(new Queue("my_queue3", false, &store, 0));
+ queue1->deliver(msg1);
+ queue2->deliver(msg1);
+ queue3->deliver(msg1);
+ sleep(2);
+
+ Message::shared_ptr next = queue1->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg1, next);
+ next = queue2->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg1, next);
+ next = queue3->dequeue();
+ CPPUNIT_ASSERT_EQUAL(msg1, next);
+
+ }
+ }
+
+
void testFlow(){
Channel channel(connection, recorder, 7);