Author: gsim
Date: Thu Feb 14 03:49:17 2008
New Revision: 627718

URL: http://svn.apache.org/viewvc?rev=627718&view=rev
Log:
Fixed bug in browsing that failed to deal correctly with 'gaps' in message 
sequence.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=627718&r1=627717&r2=627718&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 14 03:49:17 
2008
@@ -170,12 +170,17 @@
 
 bool Queue::acquire(const QueuedMessage& msg) {
     Mutex::ScopedLock locker(messageLock);
+    QPID_LOG(debug, "attempting to acquire " << msg.position);
     for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
         if (i->position == msg.position) {
             messages.erase(i);
+            QPID_LOG(debug, "Match found, acquire succeeded: " << i->position 
<< " == " << msg.position);
             return true;
+        } else {
+            QPID_LOG(debug, "No match: " << i->position << " != " << 
msg.position);
         }
     }
+    QPID_LOG(debug, "Acquire failed for " << msg.position);
     return false;
 }
 
@@ -255,8 +260,8 @@
                 m = msg;
                 return true;
             } else {
-                //consumer hasn't got enough credit for the message
-                QPID_LOG(debug, "Consumer can't currently accept message from 
'" << name << "'");
+                //browser hasn't got enough credit for the message
+                QPID_LOG(debug, "Browser can't currently accept message from 
'" << name << "'");
                 return false;
             }
         } else {
@@ -304,11 +309,13 @@
             msg = messages.front();
             return true;
         } else {        
-            uint index = (c.position - messages.front().position) + 1;
-            if (index < messages.size()) {
-                msg = messages[index];
-                return true;
-            } 
+            //TODO: can improve performance of this search, for now just 
searching linearly from end
+            Messages::reverse_iterator pos;
+            for (Messages::reverse_iterator i = messages.rbegin(); i != 
messages.rend() && i->position > c.position; i++) {
+                pos = i;
+            }
+            msg = *pos;
+            return true;
         }
     }
     addListener(c);

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=627718&r1=627717&r2=627718&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Thu Feb 14 03:49:17 
2008
@@ -720,6 +720,49 @@
         #check all 'browsed' messages are still on the queue
         self.assertEqual(5, channel.queue_query(queue="q").message_count)
 
+    def test_subscribe_not_acquired_3(self):
+        channel = self.channel
+
+        #publish some messages
+        self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range(1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' 
: "q"}, body = "message-%d" % (i)))
+
+        #create a not-acquired subscriber
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode 
= 1, acquire_mode=1)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        channel.message_flow(unit = 0, value = 10, destination = "a")
+
+        #browse through messages
+        queue = self.client.queue("a")
+        for i in range(1, 11):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.content.body)
+            if (i % 2):
+                #try to acquire every second message
+                channel.message_acquire([msg.command_id, msg.command_id])
+                #check that acquire succeeds
+                response = channel.control_queue.get(timeout=1)
+                self.assertEquals(response.transfers, [msg.command_id, 
msg.command_id])
+            msg.complete()
+        self.assertEmpty(queue)
+
+        #create a second not-acquired subscriber
+        channel.message_subscribe(queue = "q", destination = "b", confirm_mode 
= 1, acquire_mode=1)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+        channel.message_flow(unit = 0, value = 1, destination = "b")
+        #check it gets those not consumed
+        queue = self.client.queue("b")
+        for i in [2,4,6,8,10]:
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.content.body)
+            msg.complete()
+        channel.message_flow(unit = 0, value = 1, destination = "b")
+        self.assertEmpty(queue)
+
+        #check all 'browsed' messages are still on the queue
+        self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
     def test_no_size(self):
         self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
 


Reply via email to