Author: cctrieloff
Date: Wed Oct  8 11:35:46 2008
New Revision: 702958

URL: http://svn.apache.org/viewvc?rev=702958&view=rev
Log:
QPID-1306

- added lvq support
- added lvq tests
- added safety test for lvq
- updated QueueOptions for lvq
- some refactor to queue, to have signel pop loction


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Oct  8 11:35:46 
2008
@@ -51,6 +51,21 @@
 using std::mem_fun;
 namespace _qmf = qmf::org::apache::qpid::broker;
 
+
+namespace 
+{
+    const std::string qpidMaxSize("qpid.max_size");
+    const std::string qpidMaxCount("qpid.max_count");
+    const std::string qpidNoLocal("no-local");
+    const std::string qpidTraceIdentity("qpid.trace.id");
+    const std::string qpidTraceExclude("qpid.trace.exclude");
+    const std::string qpidLastValueQueue("qpid.last_value_queue");
+    const std::string qpidOptimisticConsume("qpid.optimistic_consume");
+    const std::string qpidPersistLastNode("qpid.persist_last_node");
+    const std::string qpidVQMatchProperty("qpid.LVQ_key");
+}
+
+
 Queue::Queue(const string& _name, bool _autodelete, 
              MessageStore* const _store,
              const OwnershipToken* const _owner,
@@ -253,7 +268,7 @@
             if (c->filter(msg.payload)) {
                 if (c->accept(msg.payload)) {            
                     m = msg;
-                    messages.pop_front();
+                    popMsg(msg);
                     return true;
                 } else {
                     //message(s) are available but consumer hasn't got enough 
credit
@@ -371,7 +386,7 @@
 
     if(!messages.empty()){
         msg = messages.front();
-        messages.pop_front();
+        popMsg(msg);
     }
     return msg;
 }
@@ -406,22 +421,49 @@
         QueuedMessage qmsg = messages.front();
         boost::intrusive_ptr<Message> msg = qmsg.payload;
         destq->deliver(msg); // deliver message to the destination queue
-        messages.pop_front();
+        popMsg(qmsg);
         dequeue(0, qmsg);
         count++;
     }
     return count;
 }
 
+void Queue::popMsg(QueuedMessage& qmsg)
+{
+    if (lastValueQueue){
+       const framing::FieldTable* ft = qmsg.payload->getApplicationHeaders();
+               string key = ft->getString(qpidVQMatchProperty);
+               lvq.erase(key);
+       }
+    messages.pop_front();
+}
+
 void Queue::push(boost::intrusive_ptr<Message>& msg){
     Listeners copy;
     {
         Mutex::ScopedLock locker(messageLock);   
         QueuedMessage qm(this, msg, ++sequence);
         if (policy.get()) policy->tryEnqueue(qm);
-
-        messages.push_back(qm);
-        listeners.swap(copy);
+         
+        //if (lastValueQueue && LVQinsert(qm) ) return; // LVQ update of 
existing message
+               LVQ::iterator i;
+               if (lastValueQueue){
+               const framing::FieldTable* ft = msg->getApplicationHeaders();
+                       string key = ft->getString(qpidVQMatchProperty);
+
+               i = lvq.find(key);
+                       if (i == lvq.end()){
+                messages.push_back(qm);
+                listeners.swap(copy);
+                       lvq[key] = &messages.back();
+                       }else {
+                           i->second->payload = msg;
+                       }                
+               }else {
+               
+            messages.push_back(qm);
+            listeners.swap(copy);
+               }
     }
     for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
 }
@@ -514,8 +556,8 @@
 void Queue::popAndDequeue()
 {
     QueuedMessage msg = messages.front();
-    messages.pop_front();
-    dequeue(0, msg);
+    popMsg(msg);
+       dequeue(0, msg);
 }
 
 /**
@@ -529,18 +571,6 @@
 }
 
 
-namespace 
-{
-    const std::string qpidMaxSize("qpid.max_size");
-    const std::string qpidMaxCount("qpid.max_count");
-    const std::string qpidNoLocal("no-local");
-    const std::string qpidTraceIdentity("qpid.trace.id");
-    const std::string qpidTraceExclude("qpid.trace.exclude");
-    const std::string qpidLastValueQueue("qpid.last_value_queue");
-    const std::string qpidOptimisticConsume("qpid.optimistic_consume");
-    const std::string qpidPersistLastNode("qpid.persist_last_node");
-}
-
 void Queue::create(const FieldTable& _settings)
 {
     settings = _settings;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Oct  8 11:35:46 
2008
@@ -65,6 +65,7 @@
 
             typedef std::list<Consumer::shared_ptr> Listeners;
             typedef std::deque<QueuedMessage> Messages;
+                       typedef std::map<string,QueuedMessage*> LVQ;
 
             const string name;
             const bool autodelete;
@@ -81,6 +82,7 @@
             std::vector<std::string> traceExclude;
             Listeners listeners;
             Messages messages;
+                       LVQ lvq;
             mutable qpid::sys::Mutex consumerLock;
             mutable qpid::sys::Mutex messageLock;
             mutable qpid::sys::Mutex ownershipLock;
@@ -253,6 +255,9 @@
             }
 
             bool releaseMessageContent(const QueuedMessage&);
+
+            void popMsg(QueuedMessage& qmsg);
+
         };
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.cpp Wed Oct  8 
11:35:46 2008
@@ -38,6 +38,7 @@
 const std::string QueueOptions::strLastValueQueue("qpid.last_value_queue");
 const std::string 
QueueOptions::strOptimisticConsume("qpid.optimistic_consume");
 const std::string QueueOptions::strPersistLastNode("qpid.persist_last_node");
+const std::string QueueOptions::strLVQMatchProperty("qpid.LVQ_key");
 
 
 QueueOptions::~QueueOptions()
@@ -83,15 +84,17 @@
 void QueueOptions::setOrdering(QueueOrderingPolicy op)
 {
        if (op == LVQ){
-            // TODO, add and test options with LVQ patch.
-                // also set the key match for LVQ
-            //setString(LastValueQueue, 1); 
-       
+           setInt(strLastValueQueue, 1); 
        }else{
            clearOrdering();
        }
 }
 
+void QueueOptions::getLVQKey(std::string& key)
+{
+    key.assign(strLVQMatchProperty);
+}
+
 void QueueOptions::clearSizePolicy()
 {
     erase(strMaxCountKey);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/QueueOptions.h Wed Oct  8 
11:35:46 2008
@@ -86,6 +86,11 @@
        void clearPersistLastNode();
        
        /**
+       * get the key used match LVQ in args for message transfer 
+       */
+       void getLVQKey(std::string& key);
+               
+       /**
        * Use default odering policy
        */ 
        void clearOrdering();
@@ -100,7 +105,7 @@
        static const std::string strLastValueQueue;
        static const std::string strOptimisticConsume;
        static const std::string strPersistLastNode;
-       private:
+       static const std::string strLVQMatchProperty;
        
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueOptionsTest.cpp Wed Oct  8 
11:35:46 2008
@@ -63,15 +63,19 @@
        
        ft.setOptimisticConsume();
        ft.setPersistLastNode();
+    ft.setOrdering(LVQ);
        
     BOOST_CHECK(1 == ft.getInt(QueueOptions::strOptimisticConsume));
     BOOST_CHECK(1 == ft.getInt(QueueOptions::strPersistLastNode));
+    BOOST_CHECK(1 == ft.getInt(QueueOptions::strLastValueQueue));
        
        ft.clearOptimisticConsume();
        ft.clearPersistLastNode();
+    ft.setOrdering(FIFO);
 
        BOOST_CHECK(!ft.isSet(QueueOptions::strOptimisticConsume));
        BOOST_CHECK(!ft.isSet(QueueOptions::strPersistLastNode));
+       BOOST_CHECK(!ft.isSet(QueueOptions::strLastValueQueue));
 
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=702958&r1=702957&r2=702958&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Wed Oct  8 11:35:46 
2008
@@ -240,10 +240,8 @@
 
 QPID_AUTO_TEST_CASE(testPersistLastNodeStanding){
 
-    FieldTable args;
-
-    // set queue mode
-       args.setInt("qpid.persist_last_node", 1);
+    client::QueueOptions args;
+       args.setPersistLastNode();
        
        Queue::shared_ptr queue(new Queue("my-queue", true));
     queue->configure(args);
@@ -292,8 +290,8 @@
 
 QPID_AUTO_TEST_CASE(testOptimisticConsume){
 
-    FieldTable args;
-       args.setInt("qpid.persist_last_node", 1);
+    client::QueueOptions args;
+       args.setPersistLastNode();
 
     // set queue mode
        
@@ -305,7 +303,7 @@
        msg1->forcePersistent();
 
        //change mode
-       args.setInt("qpid.optimistic_consume", 1);
+       args.setOptimisticConsume();
     queue->configure(args);
        
        //enqueue 1 message
@@ -322,6 +320,94 @@
 
 }
 
+QPID_AUTO_TEST_CASE(testLVQOrdering){
+
+    client::QueueOptions args;
+    // set queue mode
+       args.setOrdering(client::LVQ);
+
+       Queue::shared_ptr queue(new Queue("my-queue", true ));
+       queue->configure(args);
+       
+    intrusive_ptr<Message> msg1 = message("e", "A");
+    intrusive_ptr<Message> msg2 = message("e", "B");
+    intrusive_ptr<Message> msg3 = message("e", "C");
+    intrusive_ptr<Message> msg4 = message("e", "D");
+    intrusive_ptr<Message> received;
+
+    //set deliever match for LVQ a,b,c,a
+
+    string key;
+       args.getLVQKey(key);
+    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
+       
+
+       
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+       
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+       
msg3->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+       
msg4->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+       
+       //enqueue 4 message
+    queue->deliver(msg1);
+    queue->deliver(msg2);
+    queue->deliver(msg3);
+    queue->deliver(msg4);
+       
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+       
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg4.get(), received.get());
+
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg2.get(), received.get());
+       
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg3.get(), received.get());
+
+    intrusive_ptr<Message> msg5 = message("e", "A");
+    intrusive_ptr<Message> msg6 = message("e", "B");
+    intrusive_ptr<Message> msg7 = message("e", "C");
+       
msg5->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+       
msg6->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"b");
+       
msg7->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"c");
+    queue->deliver(msg5);
+    queue->deliver(msg6);
+    queue->deliver(msg7);
+       
+    BOOST_CHECK_EQUAL(queue->getMessageCount(), 3u);
+       
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg5.get(), received.get());
+
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg6.get(), received.get());
+       
+    received = queue->get().payload;
+    BOOST_CHECK_EQUAL(msg7.get(), received.get());
+       
+}
+
+QPID_AUTO_TEST_CASE(testLVQSaftyCheck){
+
+// This test is to check std::deque memory copy does not change out under us
+// if this test fails, then lvq would no longer be safe.
+
+    std::deque<string> deq;
+       
+       string a;
+       string b;
+       
+       deq.push_back(a);
+       deq.push_back(b);
+       string* tmp = &deq.back();
+       for (int a =0; a<=100000; a++){
+           string z;
+               deq.push_back(z);
+       }
+       deq.pop_front();
+    BOOST_CHECK_EQUAL(&deq.front(),tmp);
+
+}
 
 QPID_AUTO_TEST_SUITE_END()
 


Reply via email to