Author: cctrieloff
Date: Thu Oct  9 12:35:19 2008
New Revision: 703236

URL: http://svn.apache.org/viewvc?rev=703236&view=rev
Log:
QPID-1306

- from review, clean-up for acquire
- test for LVQ acquire
- suppress store for LVQ


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=703236&r1=703235&r2=703236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct  9 12:35:19 
2008
@@ -203,6 +203,11 @@
     QPID_LOG(debug, "attempting to acquire " << msg.position);
     for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
         if (i->position == msg.position) {
+            if (lastValueQueue){
+                const framing::FieldTable* ft = 
msg.payload->getApplicationHeaders();
+                string key = ft->getString(qpidVQMatchProperty);
+                lvq.erase(key);
+            }
             messages.erase(i);
             QPID_LOG(debug, "Match found, acquire succeeded: " << i->position 
<< " == " << msg.position);
             return true;
@@ -523,7 +528,7 @@
         msg->addTraceId(traceId);
     }
 
-    if (msg->isPersistent() && store) {
+    if (msg->isPersistent() && store && !lastValueQueue) {
         msg->enqueueAsync(shared_from_this(), store); //increment to async 
counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = 
boost::static_pointer_cast<PersistableMessage>(msg);
         store->enqueue(ctxt, pmsg, *this);
@@ -540,7 +545,7 @@
         Mutex::ScopedLock locker(messageLock);
         dequeued(msg);
     }
-    if (msg.payload->isPersistent() && store) {
+    if (msg.payload->isPersistent() && store && !lastValueQueue) {
         msg.payload->dequeueAsync(shared_from_this(), store); //increment to 
async counter -- for message sent to more than one queue
         boost::intrusive_ptr<PersistableMessage> pmsg = 
boost::static_pointer_cast<PersistableMessage>(msg.payload);
         store->dequeue(ctxt, pmsg, *this);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=703236&r1=703235&r2=703236&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Thu Oct  9 12:35:19 
2008
@@ -387,6 +387,54 @@
        
 }
 
+QPID_AUTO_TEST_CASE(testLVQAcquire){
+
+    client::QueueOptions args;
+    // set queue mode
+    args.setOrdering(client::LVQ);
+
+    Queue::shared_ptr queue(new Queue("my-queue", true ));
+    queue->configure(args);
+       
+    intrusive_ptr<Message> msg1 = message("e", "A");
+    intrusive_ptr<Message> msg2 = message("e", "B");
+    intrusive_ptr<Message> msg3 = message("e", "C");
+    intrusive_ptr<Message> msg4 = message("e", "D");
+    intrusive_ptr<Message> msg5 = message("e", "F");
+
+    //set deliever match for LVQ a,b,c,a
+
+    string key;
+    args.getLVQKey(key);
+    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+       
+
+    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+    
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+    
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+       
+    //enqueue 4 message
+    queue->deliver(msg1);
+    queue->deliver(msg2);
+    queue->deliver(msg3);
+    queue->deliver(msg4);
+    
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+
+    framing::SequenceNumber sequence;
+    QueuedMessage qmsg(queue.get(), msg2, ++sequence);
+    queue->acquire(qmsg);
+    
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 2u);
+    
+    queue->deliver(msg5);
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+ 
+}
+
+
 QPID_AUTO_TEST_CASE(testLVQSaftyCheck){
 
 // This test is to check std::deque memory copy does not change out under us


Reply via email to