Author: gsim
Date: Thu Oct 23 11:57:38 2008
New Revision: 707446

URL: http://svn.apache.org/viewvc?rev=707446&view=rev
Log:
Some fixes to the LVQ (primarily a patch from [EMAIL PROTECTED])


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp?rev=707446&r1=707445&r2=707446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.cpp Thu Oct 23 
11:57:38 2008
@@ -324,3 +324,17 @@
 {
     return expiration < FAR_FUTURE && expiration < AbsTime::now();
 }
+
+boost::intrusive_ptr<Message>& Message::getReplacementMessage(const Queue* 
qfor) const
+{
+    Replacement::iterator i = replacement.find(qfor);
+    if (i != replacement.end()){
+        return i->second;
+    }           
+    return empty;
+}
+
+void Message::setReplacementMessage(boost::intrusive_ptr<Message> msg, const 
Queue* qfor)
+{
+    replacement[qfor] = msg;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h?rev=707446&r1=707445&r2=707446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Message.h Thu Oct 23 11:57:38 
2008
@@ -138,6 +138,9 @@
     void addTraceId(const std::string& id);
        
        void forcePersistent();
+    
+    boost::intrusive_ptr<Message>& getReplacementMessage(const Queue* qfor) 
const;
+    void setReplacementMessage(boost::intrusive_ptr<Message> msg, const Queue* 
qfor);
 
   private:
     mutable sys::Mutex lock;
@@ -155,6 +158,10 @@
     static TransferAdapter TRANSFER;
 
     MessageAdapter& getAdapter() const;
+       typedef std::map<const Queue*,boost::intrusive_ptr<Message> > 
Replacement;
+
+    mutable Replacement replacement;
+    mutable boost::intrusive_ptr<Message> empty;
 };
 
 }}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=707446&r1=707445&r2=707446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Oct 23 11:57:38 
2008
@@ -198,17 +198,23 @@
     for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
 }
 
+void Queue::clearLVQIndex(const QueuedMessage& msg){
+    if (lastValueQueue){
+        const framing::FieldTable* ft = msg.payload->getApplicationHeaders();
+        string key = ft->getAsString(qpidVQMatchProperty);
+        lvq.erase(key);
+    }
+}
+
 bool Queue::acquire(const QueuedMessage& msg) {
     Mutex::ScopedLock locker(messageLock);
     QPID_LOG(debug, "attempting to acquire " << msg.position);
     for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
         if ((i->position == msg.position && !lastValueQueue) // note that in 
some cases payload not be set
-            || (lastValueQueue && i->position == msg.position && 
i->payload.get() == msg.payload.get())) {
-            if (lastValueQueue){
-                const framing::FieldTable* ft = 
msg.payload->getApplicationHeaders();
-                string key = ft->getAsString(qpidVQMatchProperty);
-                lvq.erase(key);
-            }
+            || (lastValueQueue && (i->position == msg.position) && 
+                msg.payload.get() == checkLvqReplace(*i).payload.get()) )  {
+
+            clearLVQIndex(msg);
             messages.erase(i);
             QPID_LOG(debug, "Match found, acquire succeeded: " << i->position 
<< " == " << msg.position);
             return true;
@@ -238,7 +244,7 @@
         addListener(c);
         return false;
     } else {
-        QueuedMessage msg = messages.front();
+        QueuedMessage msg = getFront();
         if (store && !msg.payload->isEnqueueComplete()) {
             //though a message is on the queue, it has not yet been
             //enqueued and so is not available for consumption yet,
@@ -264,7 +270,7 @@
             addListener(c);
             return false;
         } else {
-            QueuedMessage msg = messages.front();
+            QueuedMessage msg = getFront();
             if (msg.payload->hasExpired()) {
                 QPID_LOG(debug, "Message expired from queue '" << name << "'");
                 popAndDequeue();
@@ -306,6 +312,7 @@
                 //consumer wants the message
                 c->position = msg.position;
                 m = msg;
+                clearLVQIndex(msg);
                 return true;
             } else {
                 //browser hasn't got enough credit for the message
@@ -348,8 +355,8 @@
 bool Queue::seek(QueuedMessage& msg, Consumer::shared_ptr c) {
     Mutex::ScopedLock locker(messageLock);
     if (!messages.empty() && messages.back().position > c->position) {
-        if (c->position < messages.front().position) {
-            msg = messages.front();
+        if (c->position < getFront().position) {
+            msg = getFront();
             return true;
         } else {        
             //TODO: can improve performance of this search, for now just 
searching linearly from end
@@ -416,7 +423,7 @@
     QueuedMessage msg(this);
 
     if(!messages.empty()){
-        msg = messages.front();
+        msg = getFront();
         popMsg(msg);
     }
     return msg;
@@ -432,6 +439,7 @@
         {
             Mutex::ScopedLock locker(messageLock);
             for (Messages::iterator i = messages.begin(); i != 
messages.end();) {
+                if (lastValueQueue) checkLvqReplace(*i);
                 if (i->payload->hasExpired()) {
                     expired.push_back(*i);
                     i = messages.erase(i);
@@ -471,7 +479,7 @@
     uint32_t count = 0; // count how many were moved for returning
 
     while((!qty || move_count--) && !messages.empty()) {
-        QueuedMessage qmsg = messages.front();
+        QueuedMessage qmsg = getFront();
         boost::intrusive_ptr<Message> msg = qmsg.payload;
         destq->deliver(msg); // deliver message to the destination queue
         popMsg(qmsg);
@@ -509,12 +517,11 @@
             if (i == lvq.end()){
                 messages.push_back(qm);
                 listeners.swap(copy);
-                lvq[key] = &messages.back();
+                lvq[key] = msg;
             }else {
-                i->second->payload = msg;
+                i->second->setReplacementMessage(msg,this);
             }           
         }else {
-               
             messages.push_back(qm);
             listeners.swap(copy);
         }
@@ -522,13 +529,33 @@
     for_each(copy.begin(), copy.end(), boost::mem_fn(&Consumer::notify));
 }
 
+QueuedMessage Queue::getFront()
+{
+    QueuedMessage msg = messages.front();
+    if (lastValueQueue) {
+        boost::intrusive_ptr<Message> replacement = 
msg.payload->getReplacementMessage(this);
+        if (replacement.get()) msg.payload = replacement;
+    }
+    return msg;
+}
+
+QueuedMessage& Queue::checkLvqReplace(QueuedMessage& msg) const
+{
+    boost::intrusive_ptr<Message> replacement = 
msg.payload->getReplacementMessage(this);
+    if (replacement.get()) msg.payload = replacement;
+    return msg;
+}
+
 /** function only provided for unit tests, or code not in critical message 
path */
 uint32_t Queue::getMessageCount() const
 {
     Mutex::ScopedLock locker(messageLock);
   
-    uint32_t count =0;
+    uint32_t count = 0;
     for ( Messages::const_iterator i = messages.begin(); i != messages.end(); 
++i ) {
+        //NOTE: don't need to use checkLvqReplace() here as it
+        //is only relevant for LVQ which does not support persistence
+        //so the enqueueComplete check has no effect
         if ( i->payload->isEnqueueComplete() ) count ++;
     }
     
@@ -556,7 +583,8 @@
 {
     if (persistLastNode){
         Mutex::ScopedLock locker(messageLock);
-       for ( Messages::const_iterator i = messages.begin(); i != 
messages.end(); ++i ) {
+       for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i 
) {
+            if (lastValueQueue) checkLvqReplace(*i);
             i->payload->forcePersistent();
             if (i->payload->getPersistenceId() == 0){
                enqueue(0, i->payload);
@@ -609,7 +637,7 @@
  */
 void Queue::popAndDequeue()
 {
-    QueuedMessage msg = messages.front();
+    QueuedMessage msg = getFront();
     popMsg(msg);
     dequeue(0, msg);
 }
@@ -667,7 +695,7 @@
     if (alternateExchange.get()) {
         Mutex::ScopedLock locker(messageLock);
         while(!messages.empty()){
-            DeliverableMessage msg(messages.front().payload);
+            DeliverableMessage msg(getFront().payload);
             alternateExchange->route(msg, msg.getMessage().getRoutingKey(),
                                      msg.getMessage().getApplicationHeaders());
             popAndDequeue();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=707446&r1=707445&r2=707446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Oct 23 11:57:38 
2008
@@ -66,7 +66,7 @@
 
             typedef std::list<Consumer::shared_ptr> Listeners;
             typedef std::deque<QueuedMessage> Messages;
-            typedef std::map<string,QueuedMessage*> LVQ;
+            typedef std::map<string,boost::intrusive_ptr<Message> > LVQ;
 
             const string name;
             const bool autodelete;
@@ -111,7 +111,10 @@
 
             void dequeued(const QueuedMessage& msg);
             void popAndDequeue();
-
+            QueuedMessage getFront();
+            QueuedMessage& checkLvqReplace(QueuedMessage& msg) const;
+            void clearLVQIndex(const QueuedMessage& msg);
+            
             inline void mgntEnqStats(const boost::intrusive_ptr<Message>& msg)
             {
                 if (mgmtObject != 0) {
@@ -193,8 +196,8 @@
             uint32_t purge(const uint32_t purge_request = 0); //defaults to 
all messages 
             void purgeExpired();
 
-           //move qty # of messages to destination Queue destq
-           uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
+            //move qty # of messages to destination Queue destq
+            uint32_t move(const Queue::shared_ptr destq, uint32_t qty); 
 
             uint32_t getMessageCount() const;
             uint32_t getConsumerCount() const;
@@ -211,10 +214,10 @@
             const QueueBindings& getBindings() const { return bindings; }
 
             /**
-                       * used to take messages from in memory and flush down 
to disk.
-                       */
-                       void setLastNodeFailure();
-                       void clearLastNodeFailure();
+             * used to take messages from in memory and flush down to disk.
+             */
+            void setLastNodeFailure();
+            void clearLastNodeFailure();
 
             bool enqueue(TransactionContext* ctxt, 
boost::intrusive_ptr<Message> msg);
             /**

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=707446&r1=707445&r2=707446&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Thu Oct 23 11:57:38 
2008
@@ -438,27 +438,38 @@
  
 }
 
+QPID_AUTO_TEST_CASE(testLVQMultiQueue){
 
-QPID_AUTO_TEST_CASE(testLVQSaftyCheck){
+    client::QueueOptions args;
+    // set queue mode
+    args.setOrdering(client::LVQ);
+
+    Queue::shared_ptr queue1(new Queue("my-queue", true ));
+    Queue::shared_ptr queue2(new Queue("my-queue", true ));
+    intrusive_ptr<Message> received;
+    queue1->configure(args);
+    queue2->configure(args);
+       
+    intrusive_ptr<Message> msg1 = message("e", "A");
+    intrusive_ptr<Message> msg2 = message("e", "A");
 
-// This test is to check std::deque memory copy does not change out under us
-// if this test fails, then lvq would no longer be safe.
+    string key;
+    args.getLVQKey(key);
+    BOOST_CHECK_EQUAL(key, "qpid.LVQ_key");
 
-    std::deque<string> deq;
-       
-       string a;
-       string b;
+    
msg1->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
+    
msg2->getProperties<MessageProperties>()->getApplicationHeaders().setString(key,"a");
        
-       deq.push_back(a);
-       deq.push_back(b);
-       string* tmp = &deq.back();
-       for (int a =0; a<=100000; a++){
-           string z;
-               deq.push_back(z);
-       }
-       deq.pop_front();
-    BOOST_CHECK_EQUAL(&deq.front(),tmp);
-
+    queue1->deliver(msg1);
+    queue2->deliver(msg1);
+    queue1->deliver(msg2);
+    
+    received = queue1->get().payload;
+    BOOST_CHECK_EQUAL(msg2.get(), received.get());
+
+    received = queue2->get().payload;
+    BOOST_CHECK_EQUAL(msg1.get(), received.get());
+    
 }
 
 void addMessagesToQueue(uint count, Queue& queue, uint oddTtl = 200, uint 
evenTtl = 0) 


Reply via email to