Author: gsim
Date: Fri Mar 7 11:07:32 2008
New Revision: 634780
URL: http://svn.apache.org/viewvc?rev=634780&view=rev
Log:
Added acquire impl to final 0-10 codepath
Converted some more python tests
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
incubator/qpid/trunk/qpid/cpp/xml/extra.xml
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/python/run-tests
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Mar 7
11:07:32 2008
@@ -367,16 +367,20 @@
commands.for_each(acceptOp);
}
-/*
-void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers)
+framing::Message010AcquireResult
SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet&
transfers)
{
+ //TODO: change this when SequenceNumberSet is deleted along with preview
code
SequenceNumberSet results;
- RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2,
boost::ref(results));
- transfers.processRanges(op);
+ RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2,
boost::ref(results));
+ transfers.for_each(f);
+
results = results.condense();
- getProxy().getMessage().acquired(results);
+ SequenceSet acquisitions;
+ RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
+ results.processRanges(g);
+
+ return Message010AcquireResult(acquisitions);
}
-*/
void SessionAdapter::ExecutionHandlerImpl::sync()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Fri Mar 7
11:07:32 2008
@@ -149,6 +149,8 @@
void release(const framing::SequenceSet& commands,
bool setRedelivered);
+ framing::Message010AcquireResult acquire(const framing::SequenceSet&);
+
void subscribe(const string& queue,
const string& destination,
uint8_t acceptMode,
Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Fri Mar 7 11:07:32 2008
@@ -670,7 +670,16 @@
<field name="commands" domain="sequence-set"/>
<field name="set-redelivered" domain="bit"/>
</method>
-
+ <method name = "acquire" index="5">
+ <doc>blah, blah</doc>
+ <chassis name="server" implement="MUST" />
+ <field name="transfers" domain="sequence-set"/>
+ <result>
+ <struct size="long" type="4">
+ <field name="transfers" domain="sequence-set"/>
+ </struct>
+ </result>
+ </method>
<method name = "subscribe" index="7">
<doc>blah, blah</doc>
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Fri Mar 7 11:07:32
2008
@@ -39,8 +39,6 @@
tests_0-10.dtx.DtxTests.test_start_join_and_resume
tests_0-10.dtx.DtxTests.test_suspend_resume
tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
-tests_0-10.message.MessageTests.test_acquire
-tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
tests_0-10.message.MessageTests.test_consume_exclusive
tests_0-10.message.MessageTests.test_consume_no_local
tests_0-10.message.MessageTests.test_consume_no_local_awkward
@@ -60,6 +58,6 @@
tests_0-10.queue.QueueTests.test_delete_ifempty
tests_0-10.queue.QueueTests.test_delete_ifunused
tests_0-10.queue.QueueTests.test_delete_simple
-tests_0-10.queue.QueueTests.test_purge
tests_0-10.queue.QueueTests.test_bind
tests_0-10.queue.QueueTests.test_unbind_headers
+tests_0-10.queue.QueueTests.test_purge_empty_name
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Fri Mar 7 11:07:32 2008
@@ -353,5 +353,5 @@
self.session = self.conn.session("test-session", timeout=10)
def tearDown(self):
- self.session.close(timeout=10)
+ if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
Modified: incubator/qpid/trunk/qpid/python/run-tests
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/run-tests?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/run-tests (original)
+++ incubator/qpid/trunk/qpid/python/run-tests Fri Mar 7 11:07:32 2008
@@ -21,7 +21,7 @@
import sys, logging
from qpid.testlib import testrunner
-if "-v" in sys.argv:
+if "-vv" in sys.argv:
level = logging.DEBUG
else:
level = logging.WARN
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Mar 7 11:07:32
2008
@@ -635,9 +635,7 @@
session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
#use fanout for now:
- session.exchange_bind(exchange="amq.fanout", queue="q")
- session.message_transfer(destination="amq.fanout",
message=Message("acquire me"))
-
#session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"acquire me"))
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode
= 1)
session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -647,12 +645,13 @@
#message should still be on the queue:
self.assertEquals(1, session.queue_query(queue = "q").message_count)
- response = session.message_acquire(RangedSet(msg.id))
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
#check that we get notification (i.e. message_acquired)
- self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+ self.assert_(msg.id in response.transfers)
#message should have been removed from the queue:
self.assertEquals(0, session.queue_query(queue = "q").message_count)
- session.message_accept(RangedSet(msg.id))
+ session.message_accept(transfers)
def test_release(self):
@@ -800,12 +799,12 @@
session = self.session
#publish some messages
- self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
for i in range(1, 11):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"message-%d" % (i)))
#create a not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "a", confirm_mode
= 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "a",
acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
session.message_flow(unit = 0, value = 10, destination = "a")
@@ -816,19 +815,18 @@
self.assertEquals("message-%d" % (i), msg.body)
if (i % 2):
#try to acquire every second message
- session.message_acquire([msg.command_id, msg.command_id])
+ response = session.message_acquire(RangedSet(msg.id))
#check that acquire succeeds
- response = session.control_queue.get(timeout=1)
- self.assertEquals(response.transfers, [msg.command_id,
msg.command_id])
- session.message_release(RangedSet(msg.id))
- session.channel._completed.add(msg.id)
- session.channel.session_completed(session.channel._completed)
-
- msg.complete()
+ self.assert_(msg.id in response.transfers)
+ session.message_accept(RangedSet(msg.id))
+ else:
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
self.assertEmpty(queue)
#create a second not-acquired subscriber
- session.message_subscribe(queue = "q", destination = "b", confirm_mode
= 1, acquire_mode=1)
+ session.message_subscribe(queue = "q", destination = "b",
acquire_mode=1)
session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
session.message_flow(unit = 0, value = 1, destination = "b")
#check it gets those not consumed
@@ -836,7 +834,9 @@
for i in [2,4,6,8,10]:
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
- msg.complete()
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
session.message_flow(unit = 0, value = 1, destination = "b")
self.assertEmpty(queue)
@@ -898,6 +898,21 @@
queue = session.incoming("d")
msg = queue.get(timeout = 3)
self.assertEquals("message-body", msg.body)
+
+ def test_empty_body(self):
+ session = self.session
+ session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
+ props = session.delivery_properties(routing_key="xyz")
+ session.message_transfer(message=Message(props, ""))
+
+ consumer_tag = "tag1"
+ session.message_subscribe(queue="xyz", destination=consumer_tag)
+ session.message_flow(unit = 0, value = 0xFFFFFFFF, destination =
consumer_tag)
+ session.message_flow(unit = 1, value = 0xFFFFFFFF, destination =
consumer_tag)
+ queue = session.incoming(consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEquals("", msg.body)
+ session.message_accept(RangedSet(msg.id))
def assertDataEquals(self, session, msg, expected):
self.assertEquals(expected, msg.body)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Fri Mar 7 11:07:32
2008
@@ -20,6 +20,7 @@
from qpid.queue import Empty
from qpid.testlib import TestBase010
from qpid.datatypes import Message
+from qpid.session import SessionException
class QueueTests(TestBase010):
"""Tests for 'methods' on the amqp queue 'class'"""
@@ -54,22 +55,31 @@
msg = queue.get(timeout=1)
self.assertEqual("four", msg.body)
- #check error conditions (use new sessions):
- session = self.conn.session("error-checker")
+ def test_purge_queue_exists(self):
+ """
+ Test that the correct exception is thrown is no queue exists
+ for the name specified in purge
+ """
+ session = self.session
try:
#queue specified but doesn't exist:
session.queue_purge(queue="invalid-queue")
self.fail("Expected failure when purging non-existent queue")
- except Closed, e:
- self.assertChannelException(404, e.args[0])
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code) #not-found
- session = self.conn.session("error-checker")
+ def test_purge_empty_name(self):
+ """
+ Test that the correct exception is thrown is no queue name
+ is specified for purge
+ """
+ session = self.session
try:
#queue not specified and none previously declared for channel:
session.queue_purge()
self.fail("Expected failure when purging unspecified queue")
- except Closed, e:
- self.assertConnectionException(530, e.args[0])
+ except SessionException, e:
+ self.assertEquals(531, e.args[0].error_code) #illegal-argument
def test_declare_exclusive(self):
"""