Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py?rev=648207&r1=648206&r2=648207&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py (original) +++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py Tue Apr 15 03:28:06 2008 @@ -18,134 +18,133 @@ # from qpid.client import Client, Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.testlib import TestBase010 +from qpid.datatypes import Message +from qpid.session import SessionException -class QueueTests(TestBase): +class QueueTests(TestBase010): """Tests for 'methods' on the amqp queue 'class'""" def test_purge(self): """ Test that the purge method removes messages from the queue """ - channel = self.channel + session = self.session #setup, declare a queue and add some messages to it: - channel.exchange_declare(exchange="test-exchange", type="direct") - channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) - channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key") - channel.message_transfer(destination="test-exchange", content=Content("one", properties={'routing_key':"key"})) - channel.message_transfer(destination="test-exchange", content=Content("two", properties={'routing_key':"key"})) - channel.message_transfer(destination="test-exchange", content=Content("three", properties={'routing_key':"key"})) + session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three")) #check that the queue now reports 3 messages: - channel.queue_declare(queue="test-queue") - reply = channel.queue_query(queue="test-queue") + session.queue_declare(queue="test-queue") + reply = session.queue_query(queue="test-queue") self.assertEqual(3, reply.message_count) #now do the purge, then test that three messages are purged and the count drops to 0 - channel.queue_purge(queue="test-queue"); - reply = channel.queue_query(queue="test-queue") + session.queue_purge(queue="test-queue"); + reply = session.queue_query(queue="test-queue") self.assertEqual(0, reply.message_count) #send a further message and consume it, ensuring that the other messages are really gone - channel.message_transfer(destination="test-exchange", content=Content("four", properties={'routing_key':"key"})) - self.subscribe(queue="test-queue", destination="tag") - queue = self.client.queue("tag") + session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four")) + session.message_subscribe(queue="test-queue", destination="tag") + session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF) + queue = session.incoming("tag") msg = queue.get(timeout=1) - self.assertEqual("four", msg.content.body) + self.assertEqual("four", msg.body) - #check error conditions (use new channels): - channel = self.client.channel(2) - channel.session_open() + def test_purge_queue_exists(self): + """ + Test that the correct exception is thrown is no queue exists + for the name specified in purge + """ + session = self.session try: #queue specified but doesn't exist: - channel.queue_purge(queue="invalid-queue") + session.queue_purge(queue="invalid-queue") self.fail("Expected failure when purging non-existent queue") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) #not-found - channel = self.client.channel(3) - channel.session_open() + def test_purge_empty_name(self): + """ + Test that the correct exception is thrown is no queue name + is specified for purge + """ + session = self.session try: #queue not specified and none previously declared for channel: - channel.queue_purge() + session.queue_purge() self.fail("Expected failure when purging unspecified queue") - except Closed, e: - self.assertConnectionException(530, e.args[0]) - - #cleanup - other = self.connect() - channel = other.channel(1) - channel.session_open() - channel.exchange_delete(exchange="test-exchange") + except SessionException, e: + self.assertEquals(531, e.args[0].error_code) #illegal-argument def test_declare_exclusive(self): """ Test that the exclusive field is honoured in queue.declare """ - # TestBase.setUp has already opened channel(1) - c1 = self.channel + # TestBase.setUp has already opened session(1) + s1 = self.session # Here we open a second separate connection: - other = self.connect() - c2 = other.channel(1) - c2.session_open() + s2 = self.conn.session("other", 2) #declare an exclusive queue: - c1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) try: #other connection should not be allowed to declare this: - c2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) + s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True) self.fail("Expected second exclusive queue_declare to raise a channel exception") - except Closed, e: - self.assertChannelException(405, e.args[0]) + except SessionException, e: + self.assertEquals(405, e.args[0].error_code) def test_declare_passive(self): """ Test that the passive field is honoured in queue.declare """ - channel = self.channel + session = self.session #declare an exclusive queue: - channel.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) - channel.queue_declare(queue="passive-queue-1", passive=True) + session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="passive-queue-1", passive=True) try: #other connection should not be allowed to declare this: - channel.queue_declare(queue="passive-queue-2", passive=True) + session.queue_declare(queue="passive-queue-2", passive=True) self.fail("Expected passive declaration of non-existant queue to raise a channel exception") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) #not-found def test_bind(self): """ Test various permutations of the queue.bind method """ - channel = self.channel - channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + session = self.session + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) #straightforward case, both exchange & queue exist so no errors expected: - channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1") + session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1") #use the queue name where the routing key is not specified: - channel.queue_bind(queue="queue-1", exchange="amq.direct") + session.exchange_bind(queue="queue-1", exchange="amq.direct") #try and bind to non-existant exchange try: - channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1") + session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1") self.fail("Expected bind to non-existant exchange to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) - #need to reopen a channel: - channel = self.client.channel(2) - channel.session_open() + def test_bind_queue_existence(self): + session = self.session #try and bind non-existant queue: try: - channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1") + session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1") self.fail("Expected bind of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) def test_unbind_direct(self): self.unbind_test(exchange="amq.direct", routing_key="key") @@ -159,42 +158,53 @@ def test_unbind_headers(self): self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"}) - def unbind_test(self, exchange, routing_key="", args=None, headers={}): + def unbind_test(self, exchange, routing_key="", args=None, headers=None): #bind two queues and consume from them - channel = self.channel + session = self.session - channel.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) - channel.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) - - self.subscribe(queue="queue-1", destination="queue-1") - self.subscribe(queue="queue-2", destination="queue-2") - - queue1 = self.client.queue("queue-1") - queue2 = self.client.queue("queue-2") - - channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) - channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args) + session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True) + session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True) + session.message_subscribe(queue="queue-1", destination="queue-1") + session.message_flow(destination="queue-1", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="queue-1", unit=1, value=0xFFFFFFFF) + session.message_subscribe(queue="queue-2", destination="queue-2") + session.message_flow(destination="queue-2", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="queue-2", unit=1, value=0xFFFFFFFF) + + queue1 = session.incoming("queue-1") + queue2 = session.incoming("queue-2") + + session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args) + session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args) + + dp = session.delivery_properties(routing_key=routing_key) + if (headers): + mp = session.message_properties(application_headers=headers) + msg1 = Message(dp, mp, "one") + msg2 = Message(dp, mp, "two") + else: + msg1 = Message(dp, "one") + msg2 = Message(dp, "two") + #send a message that will match both bindings - channel.message_transfer(destination=exchange, - content=Content("one", properties={'routing_key':routing_key, 'application_headers':headers})) + session.message_transfer(destination=exchange, message=msg1) #unbind first queue - channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args) + session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key) #send another message - channel.message_transfer(destination=exchange, - content=Content("two", properties={'routing_key':routing_key, 'application_headers':headers})) + session.message_transfer(destination=exchange, message=msg2) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).content.body) + self.assertEquals("one", queue1.get(timeout=1).body) try: msg = queue1.get(timeout=1) - self.fail("Got extra message: %s" % msg.content.body) + self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).content.body) - self.assertEquals("two", queue2.get(timeout=1).content.body) + self.assertEquals("one", queue2.get(timeout=1).body) + self.assertEquals("two", queue2.get(timeout=1).body) try: msg = queue2.get(timeout=1) self.fail("Got extra message: " + msg) @@ -205,29 +215,32 @@ """ Test core queue deletion behaviour """ - channel = self.channel + session = self.session #straight-forward case: - channel.queue_declare(queue="delete-me") - channel.message_transfer(content=Content("a", properties={'routing_key':"delete-me"})) - channel.message_transfer(content=Content("b", properties={'routing_key':"delete-me"})) - channel.message_transfer(content=Content("c", properties={'routing_key':"delete-me"})) - channel.queue_delete(queue="delete-me") + session.queue_declare(queue="delete-me") + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b")) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c")) + session.queue_delete(queue="delete-me") #check that it has gone be declaring passively try: - channel.queue_declare(queue="delete-me", passive=True) + session.queue_declare(queue="delete-me", passive=True) self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) + def test_delete_queue_exists(self): + """ + Test core queue deletion behaviour + """ #check attempted deletion of non-existant queue is handled correctly: - channel = self.client.channel(2) - channel.session_open() + session = self.session try: - channel.queue_delete(queue="i-dont-exist", if_empty=True) + session.queue_delete(queue="i-dont-exist", if_empty=True) self.fail("Expected delete of non-existant queue to fail") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) @@ -235,104 +248,103 @@ """ Test that if_empty field of queue_delete is honoured """ - channel = self.channel + session = self.session #create a queue and add a message to it (use default binding): - channel.queue_declare(queue="delete-me-2") - channel.queue_declare(queue="delete-me-2", passive=True) - channel.message_transfer(content=Content("message", properties={'routing_key':"delete-me-2"})) + session.queue_declare(queue="delete-me-2") + session.queue_declare(queue="delete-me-2", passive=True) + session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message")) #try to delete, but only if empty: try: - channel.queue_delete(queue="delete-me-2", if_empty=True) + session.queue_delete(queue="delete-me-2", if_empty=True) self.fail("Expected delete if_empty to fail for non-empty queue") - except Closed, e: - self.assertChannelException(406, e.args[0]) + except SessionException, e: + self.assertEquals(406, e.args[0].error_code) - #need new channel now: - channel = self.client.channel(2) - channel.session_open() + #need new session now: + session = self.conn.session("replacement", 2) #empty queue: - self.subscribe(channel, destination="consumer_tag", queue="delete-me-2") - queue = self.client.queue("consumer_tag") + session.message_subscribe(destination="consumer_tag", queue="delete-me-2") + session.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF) + session.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF) + queue = session.incoming("consumer_tag") msg = queue.get(timeout=1) - self.assertEqual("message", msg.content.body) - channel.message_cancel(destination="consumer_tag") + self.assertEqual("message", msg.body) + session.message_cancel(destination="consumer_tag") #retry deletion on empty queue: - channel.queue_delete(queue="delete-me-2", if_empty=True) + session.queue_delete(queue="delete-me-2", if_empty=True) #check that it has gone by declaring passively: try: - channel.queue_declare(queue="delete-me-2", passive=True) + session.queue_declare(queue="delete-me-2", passive=True) self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) def test_delete_ifunused(self): """ Test that if_unused field of queue_delete is honoured """ - channel = self.channel + session = self.session #create a queue and register a consumer: - channel.queue_declare(queue="delete-me-3") - channel.queue_declare(queue="delete-me-3", passive=True) - self.subscribe(destination="consumer_tag", queue="delete-me-3") - - #need new channel now: - channel2 = self.client.channel(2) - channel2.session_open() + session.queue_declare(queue="delete-me-3") + session.queue_declare(queue="delete-me-3", passive=True) + session.message_subscribe(destination="consumer_tag", queue="delete-me-3") + + #need new session now: + session2 = self.conn.session("replacement", 2) + #try to delete, but only if empty: try: - channel2.queue_delete(queue="delete-me-3", if_unused=True) + session2.queue_delete(queue="delete-me-3", if_unused=True) self.fail("Expected delete if_unused to fail for queue with existing consumer") - except Closed, e: - self.assertChannelException(406, e.args[0]) - + except SessionException, e: + self.assertEquals(406, e.args[0].error_code) - channel.message_cancel(destination="consumer_tag") - channel.queue_delete(queue="delete-me-3", if_unused=True) + session.message_cancel(destination="consumer_tag") + session.queue_delete(queue="delete-me-3", if_unused=True) #check that it has gone by declaring passively: try: - channel.queue_declare(queue="delete-me-3", passive=True) + session.queue_declare(queue="delete-me-3", passive=True) self.fail("Queue has not been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code) def test_autodelete_shared(self): """ Test auto-deletion (of non-exclusive queues) """ - channel = self.channel - other = self.connect() - channel2 = other.channel(1) - channel2.session_open() - - channel.queue_declare(queue="auto-delete-me", auto_delete=True) - - #consume from both channels - reply = channel.basic_consume(queue="auto-delete-me") - channel2.basic_consume(queue="auto-delete-me") + session = self.session + session2 =self.conn.session("other", 1) + + session.queue_declare(queue="auto-delete-me", auto_delete=True) + + #consume from both sessions + tag = "my-tag" + session.message_subscribe(queue="auto-delete-me", destination=tag) + session2.message_subscribe(queue="auto-delete-me", destination=tag) #implicit cancel - channel2.session_close() + session2.close() #check it is still there - channel.queue_declare(queue="auto-delete-me", passive=True) + session.queue_declare(queue="auto-delete-me", passive=True) #explicit cancel => queue is now unused again: - channel.basic_cancel(consumer_tag=reply.consumer_tag) + session.message_cancel(destination=tag) #NOTE: this assumes there is no timeout in use - #check that it has gone be declaring passively + #check that it has gone by declaring it passively try: - channel.queue_declare(queue="auto-delete-me", passive=True) + session.queue_declare(queue="auto-delete-me", passive=True) self.fail("Expected queue to have been deleted") - except Closed, e: - self.assertChannelException(404, e.args[0]) + except SessionException, e: + self.assertEquals(404, e.args[0].error_code)
Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py?rev=648207&r1=648206&r2=648207&view=diff ============================================================================== --- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py (original) +++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py Tue Apr 15 03:28:06 2008 @@ -18,10 +18,10 @@ # from qpid.client import Client, Closed from qpid.queue import Empty -from qpid.content import Content -from qpid.testlib import testrunner, TestBase +from qpid.datatypes import Message, RangedSet +from qpid.testlib import testrunner, TestBase010 -class TxTests(TestBase): +class TxTests(TestBase010): """ Tests for 'methods' on the amqp tx 'class' """ @@ -30,202 +30,236 @@ """ Test that commited publishes are delivered and commited acks are not re-delivered """ - channel2 = self.client.channel(2) - channel2.session_open() - self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", "tx-commit-c") - channel2.tx_commit() - channel2.session_close() - - #use a different channel with new subscriptions to ensure - #there is no redelivery of acked messages: - channel = self.channel - channel.tx_select() + session = self.session - self.subscribe(channel, queue="tx-commit-a", destination="qa", confirm_mode=1) - queue_a = self.client.queue("qa") + #declare queues and create subscribers in the checking session + #to ensure that the queues are not auto-deleted too early: + self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"]) + session.message_subscribe(queue="tx-commit-a", destination="qa") + session.message_subscribe(queue="tx-commit-b", destination="qb") + session.message_subscribe(queue="tx-commit-c", destination="qc") + + #use a separate session for actual work + session2 = self.conn.session("worker", 2) + self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c") + session2.tx_commit() + session2.close() + + session.tx_select() - self.subscribe(channel, queue="tx-commit-b", destination="qb", confirm_mode=1) - queue_b = self.client.queue("qb") + self.enable_flow("qa") + queue_a = session.incoming("qa") - self.subscribe(channel, queue="tx-commit-c", destination="qc", confirm_mode=1) - queue_c = self.client.queue("qc") + self.enable_flow("qb") + queue_b = session.incoming("qb") + + self.enable_flow("qc") + queue_c = session.incoming("qc") #check results for i in range(1, 5): msg = queue_c.get(timeout=1) - self.assertEqual("TxMessage %d" % i, msg.content.body) - msg.complete() + self.assertEqual("TxMessage %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_b.get(timeout=1) - self.assertEqual("TxMessage 6", msg.content.body) - msg.complete() + self.assertEqual("TxMessage 6", msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_a.get(timeout=1) - self.assertEqual("TxMessage 7", msg.content.body) - msg.complete() + self.assertEqual("TxMessage 7", msg.body) + session.message_accept(RangedSet(msg.id)) for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.tx_commit() + session.tx_commit() def test_auto_rollback(self): """ - Test that a channel closed with an open transaction is effectively rolled back + Test that a session closed with an open transaction is effectively rolled back """ - channel2 = self.client.channel(2) - channel2.session_open() - queue_a, queue_b, queue_c = self.perform_txn_work(channel2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") + session = self.session + self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"]) + session.message_subscribe(queue="tx-autorollback-a", destination="qa") + session.message_subscribe(queue="tx-autorollback-b", destination="qb") + session.message_subscribe(queue="tx-autorollback-c", destination="qc") + + session2 = self.conn.session("worker", 2) + queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c") for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None - channel2.session_close() - channel = self.channel - channel.tx_select() + session2.close() + + session.tx_select() - self.subscribe(channel, queue="tx-autorollback-a", destination="qa", confirm_mode=1) - queue_a = self.client.queue("qa") + self.enable_flow("qa") + queue_a = session.incoming("qa") - self.subscribe(channel, queue="tx-autorollback-b", destination="qb", confirm_mode=1) - queue_b = self.client.queue("qb") + self.enable_flow("qb") + queue_b = session.incoming("qb") - self.subscribe(channel, queue="tx-autorollback-c", destination="qc", confirm_mode=1) - queue_c = self.client.queue("qc") + self.enable_flow("qc") + queue_c = session.incoming("qc") #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - msg.complete() + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - msg.complete() + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - msg.complete() + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.tx_commit() + session.tx_commit() def test_rollback(self): """ Test that rolled back publishes are not delivered and rolled back acks are re-delivered """ - channel = self.channel - queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") + session = self.session + queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c") for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None - #stop subscriptions (ensures no delivery occurs during rollback as messages are requeued) - for d in ["sub_a", "sub_b", "sub_c"]: - channel.message_stop(destination=d) - - channel.tx_rollback() - - #restart susbcriptions - for d in ["sub_a", "sub_b", "sub_c"]: - channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF) - channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF) + session.tx_rollback() + + #need to release messages to get them redelivered now: + session.message_release(consumed) #check results for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) - msg.complete() + self.assertEqual("Message %d" % i, msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - msg.complete() + self.assertEqual("Message 6", msg.body) + session.message_accept(RangedSet(msg.id)) msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - msg.complete() + self.assertEqual("Message 7", msg.body) + session.message_accept(RangedSet(msg.id)) for q in [queue_a, queue_b, queue_c]: try: extra = q.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) + self.fail("Got unexpected message: " + extra.body) except Empty: None #cleanup - channel.tx_commit() + session.tx_commit() - def perform_txn_work(self, channel, name_a, name_b, name_c): + def perform_txn_work(self, session, name_a, name_b, name_c): """ Utility method that does some setup and some work under a transaction. Used for testing both commit and rollback """ #setup: - channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True) - channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True) - channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True) + self.declare_queues([name_a, name_b, name_c]) key = "my_key_" + name_b topic = "my_topic_" + name_c - channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key) - channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic) + session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key) + session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic) + dp = session.delivery_properties(routing_key=name_a) for i in range(1, 5): - channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"msg%d" % i}, body="Message %d" % i)) + mp = session.message_properties(message_id="msg%d" % i) + session.message_transfer(message=Message(dp, mp, "Message %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6")) - channel.message_transfer(destination="amq.direct", - content=Content(properties={'routing_key':key, 'message_id':"msg6"}, body="Message 6")) - channel.message_transfer(destination="amq.topic", - content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, body="Message 7")) + dp = session.delivery_properties(routing_key=topic) + mp = session.message_properties(message_id="msg7") + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7")) - channel.tx_select() + session.tx_select() #consume and ack messages - self.subscribe(channel, queue=name_a, destination="sub_a", confirm_mode=1) - queue_a = self.client.queue("sub_a") + acked = RangedSet() + self.subscribe(session, queue=name_a, destination="sub_a") + queue_a = session.incoming("sub_a") for i in range(1, 5): msg = queue_a.get(timeout=1) - self.assertEqual("Message %d" % i, msg.content.body) + acked.add(msg.id) + self.assertEqual("Message %d" % i, msg.body) - msg.complete() - - self.subscribe(channel, queue=name_b, destination="sub_b", confirm_mode=1) - queue_b = self.client.queue("sub_b") + self.subscribe(session, queue=name_b, destination="sub_b") + queue_b = session.incoming("sub_b") msg = queue_b.get(timeout=1) - self.assertEqual("Message 6", msg.content.body) - msg.complete() + self.assertEqual("Message 6", msg.body) + acked.add(msg.id) - sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", confirm_mode=1) - queue_c = self.client.queue("sub_c") + sub_c = self.subscribe(session, queue=name_c, destination="sub_c") + queue_c = session.incoming("sub_c") msg = queue_c.get(timeout=1) - self.assertEqual("Message 7", msg.content.body) - msg.complete() + self.assertEqual("Message 7", msg.body) + acked.add(msg.id) + + session.message_accept(acked) + dp = session.delivery_properties(routing_key=topic) #publish messages for i in range(1, 5): - channel.message_transfer(destination="amq.topic", - content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i}, - body="TxMessage %d" % i)) - - channel.message_transfer(destination="amq.direct", - content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"}, - body="TxMessage 6")) - channel.message_transfer(content=Content(properties={'routing_key':name_a, 'message_id':"tx-msg7"}, - body="TxMessage 7")) - return queue_a, queue_b, queue_c + mp = session.message_properties(message_id="tx-msg%d" % i) + session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i)) + + dp = session.delivery_properties(routing_key=key) + mp = session.message_properties(message_id="tx-msg6") + session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6")) + + dp = session.delivery_properties(routing_key=name_a) + mp = session.message_properties(message_id="tx-msg7") + session.message_transfer(message=Message(dp, mp, "TxMessage 7")) + return queue_a, queue_b, queue_c, acked + + def declare_queues(self, names, session=None): + session = session or self.session + for n in names: + session.queue_declare(queue=n, auto_delete=True) + + def subscribe(self, session=None, **keys): + session = session or self.session + consumer_tag = keys["destination"] + session.message_subscribe(**keys) + session.message_flow(destination=consumer_tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=consumer_tag, unit=1, value=0xFFFFFFFF) + + def enable_flow(self, tag, session=None): + session = session or self.session + session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF) + session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF) + + def complete(self, session, msg): + session.receiver._completed.add(msg.id)#TODO: this may be done automatically + session.channel.session_completed(session.receiver._completed) + Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/__init__.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/__init__.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/alternate_exchange.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/alternate_exchange.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/broker.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/broker.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/dtx.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/dtx.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/example.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/example.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/exchange.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/exchange.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/execution.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/execution.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/management.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/management.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/message.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/message.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/persistence.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/persistence.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/query.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/query.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/queue.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/queue.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/testlib.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/testlib.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty) Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/tx.py URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/tx.py?rev=648207&r1=647716&r2=648207&view=diff ============================================================================== (empty)
