Author: gsim
Date: Thu Feb 14 03:49:17 2008
New Revision: 627718
URL: http://svn.apache.org/viewvc?rev=627718&view=rev
Log:
Fixed bug in browsing that failed to deal correctly with 'gaps' in message
sequence.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=627718&r1=627717&r2=627718&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Feb 14 03:49:17
2008
@@ -170,12 +170,17 @@
bool Queue::acquire(const QueuedMessage& msg) {
Mutex::ScopedLock locker(messageLock);
+ QPID_LOG(debug, "attempting to acquire " << msg.position);
for (Messages::iterator i = messages.begin(); i != messages.end(); i++) {
if (i->position == msg.position) {
messages.erase(i);
+ QPID_LOG(debug, "Match found, acquire succeeded: " << i->position
<< " == " << msg.position);
return true;
+ } else {
+ QPID_LOG(debug, "No match: " << i->position << " != " <<
msg.position);
}
}
+ QPID_LOG(debug, "Acquire failed for " << msg.position);
return false;
}
@@ -255,8 +260,8 @@
m = msg;
return true;
} else {
- //consumer hasn't got enough credit for the message
- QPID_LOG(debug, "Consumer can't currently accept message from
'" << name << "'");
+ //browser hasn't got enough credit for the message
+ QPID_LOG(debug, "Browser can't currently accept message from
'" << name << "'");
return false;
}
} else {
@@ -304,11 +309,13 @@
msg = messages.front();
return true;
} else {
- uint index = (c.position - messages.front().position) + 1;
- if (index < messages.size()) {
- msg = messages[index];
- return true;
- }
+ //TODO: can improve performance of this search, for now just
searching linearly from end
+ Messages::reverse_iterator pos;
+ for (Messages::reverse_iterator i = messages.rbegin(); i !=
messages.rend() && i->position > c.position; i++) {
+ pos = i;
+ }
+ msg = *pos;
+ return true;
}
}
addListener(c);
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=627718&r1=627717&r2=627718&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Thu Feb 14 03:49:17
2008
@@ -720,6 +720,49 @@
#check all 'browsed' messages are still on the queue
self.assertEqual(5, channel.queue_query(queue="q").message_count)
+ def test_subscribe_not_acquired_3(self):
+ channel = self.channel
+
+ #publish some messages
+ self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ channel.message_transfer(content=Content(properties={'routing_key'
: "q"}, body = "message-%d" % (i)))
+
+ #create a not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "a", confirm_mode
= 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ channel.message_flow(unit = 0, value = 10, destination = "a")
+
+ #browse through messages
+ queue = self.client.queue("a")
+ for i in range(1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ if (i % 2):
+ #try to acquire every second message
+ channel.message_acquire([msg.command_id, msg.command_id])
+ #check that acquire succeeds
+ response = channel.control_queue.get(timeout=1)
+ self.assertEquals(response.transfers, [msg.command_id,
msg.command_id])
+ msg.complete()
+ self.assertEmpty(queue)
+
+ #create a second not-acquired subscriber
+ channel.message_subscribe(queue = "q", destination = "b", confirm_mode
= 1, acquire_mode=1)
+ channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ #check it gets those not consumed
+ queue = self.client.queue("b")
+ for i in [2,4,6,8,10]:
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.content.body)
+ msg.complete()
+ channel.message_flow(unit = 0, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
def test_no_size(self):
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)