Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py?rev=648207&r1=648206&r2=648207&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/queue.py Tue 
Apr 15 03:28:06 2008
@@ -18,134 +18,133 @@
 #
 from qpid.client import Client, Closed
 from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
+from qpid.session import SessionException
 
-class QueueTests(TestBase):
+class QueueTests(TestBase010):
     """Tests for 'methods' on the amqp queue 'class'"""
 
     def test_purge(self):
         """
         Test that the purge method removes messages from the queue
         """
-        channel = self.channel
+        session = self.session
         #setup, declare a queue and add some messages to it:
-        channel.exchange_declare(exchange="test-exchange", type="direct")
-        channel.queue_declare(queue="test-queue", exclusive=True, 
auto_delete=True)
-        channel.queue_bind(queue="test-queue", exchange="test-exchange", 
routing_key="key")
-        channel.message_transfer(destination="test-exchange", 
content=Content("one", properties={'routing_key':"key"}))
-        channel.message_transfer(destination="test-exchange", 
content=Content("two", properties={'routing_key':"key"}))
-        channel.message_transfer(destination="test-exchange", 
content=Content("three", properties={'routing_key':"key"}))
+        session.queue_declare(queue="test-queue", exclusive=True, 
auto_delete=True)
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"),
 "one"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"),
 "two"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"),
 "three"))
 
         #check that the queue now reports 3 messages:
-        channel.queue_declare(queue="test-queue")
-        reply = channel.queue_query(queue="test-queue")
+        session.queue_declare(queue="test-queue")
+        reply = session.queue_query(queue="test-queue")
         self.assertEqual(3, reply.message_count)
 
         #now do the purge, then test that three messages are purged and the 
count drops to 0
-        channel.queue_purge(queue="test-queue");
-        reply = channel.queue_query(queue="test-queue")
+        session.queue_purge(queue="test-queue");
+        reply = session.queue_query(queue="test-queue")
         self.assertEqual(0, reply.message_count)        
 
         #send a further message and consume it, ensuring that the other 
messages are really gone
-        channel.message_transfer(destination="test-exchange", 
content=Content("four", properties={'routing_key':"key"}))
-        self.subscribe(queue="test-queue", destination="tag")
-        queue = self.client.queue("tag")
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"),
 "four"))
+        session.message_subscribe(queue="test-queue", destination="tag")
+        session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF)
+        queue = session.incoming("tag")
         msg = queue.get(timeout=1)
-        self.assertEqual("four", msg.content.body)
+        self.assertEqual("four", msg.body)
 
-        #check error conditions (use new channels): 
-        channel = self.client.channel(2)
-        channel.session_open()
+    def test_purge_queue_exists(self):
+        """        
+        Test that the correct exception is thrown is no queue exists
+        for the name specified in purge        
+        """        
+        session = self.session
         try:
             #queue specified but doesn't exist:
-            channel.queue_purge(queue="invalid-queue")
+            session.queue_purge(queue="invalid-queue")            
             self.fail("Expected failure when purging non-existent queue")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code) #not-found
 
-        channel = self.client.channel(3)
-        channel.session_open()
+    def test_purge_empty_name(self):        
+        """
+        Test that the correct exception is thrown is no queue name
+        is specified for purge
+        """        
+        session = self.session
         try:
             #queue not specified and none previously declared for channel:
-            channel.queue_purge()
+            session.queue_purge()
             self.fail("Expected failure when purging unspecified queue")
-        except Closed, e:
-            self.assertConnectionException(530, e.args[0])
-
-        #cleanup    
-        other = self.connect()
-        channel = other.channel(1)
-        channel.session_open()
-        channel.exchange_delete(exchange="test-exchange")
+        except SessionException, e:
+            self.assertEquals(531, e.args[0].error_code) #illegal-argument
 
     def test_declare_exclusive(self):
         """
         Test that the exclusive field is honoured in queue.declare
         """
-        # TestBase.setUp has already opened channel(1)
-        c1 = self.channel
+        # TestBase.setUp has already opened session(1)
+        s1 = self.session
         # Here we open a second separate connection:
-        other = self.connect()
-        c2 = other.channel(1)
-        c2.session_open()
+        s2 = self.conn.session("other", 2)
 
         #declare an exclusive queue:
-        c1.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
+        s1.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
         try:
             #other connection should not be allowed to declare this:
-            c2.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
+            s2.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
             self.fail("Expected second exclusive queue_declare to raise a 
channel exception")
-        except Closed, e:
-            self.assertChannelException(405, e.args[0])
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
 
 
     def test_declare_passive(self):
         """
         Test that the passive field is honoured in queue.declare
         """
-        channel = self.channel
+        session = self.session
         #declare an exclusive queue:
-        channel.queue_declare(queue="passive-queue-1", exclusive=True, 
auto_delete=True)
-        channel.queue_declare(queue="passive-queue-1", passive=True)
+        session.queue_declare(queue="passive-queue-1", exclusive=True, 
auto_delete=True)
+        session.queue_declare(queue="passive-queue-1", passive=True)
         try:
             #other connection should not be allowed to declare this:
-            channel.queue_declare(queue="passive-queue-2", passive=True)
+            session.queue_declare(queue="passive-queue-2", passive=True)
             self.fail("Expected passive declaration of non-existant queue to 
raise a channel exception")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code) #not-found
 
 
     def test_bind(self):
         """
         Test various permutations of the queue.bind method
         """
-        channel = self.channel
-        channel.queue_declare(queue="queue-1", exclusive=True, 
auto_delete=True)
+        session = self.session
+        session.queue_declare(queue="queue-1", exclusive=True, 
auto_delete=True)
 
         #straightforward case, both exchange & queue exist so no errors 
expected:
-        channel.queue_bind(queue="queue-1", exchange="amq.direct", 
routing_key="key1")
+        session.exchange_bind(queue="queue-1", exchange="amq.direct", 
binding_key="key1")
 
         #use the queue name where the routing key is not specified:
-        channel.queue_bind(queue="queue-1", exchange="amq.direct")
+        session.exchange_bind(queue="queue-1", exchange="amq.direct")
 
         #try and bind to non-existant exchange
         try:
-            channel.queue_bind(queue="queue-1", 
exchange="an-invalid-exchange", routing_key="key1")
+            session.exchange_bind(queue="queue-1", 
exchange="an-invalid-exchange", binding_key="key1")
             self.fail("Expected bind to non-existant exchange to fail")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
 
-        #need to reopen a channel:    
-        channel = self.client.channel(2)
-        channel.session_open()
 
+    def test_bind_queue_existence(self):
+        session = self.session
         #try and bind non-existant queue:
         try:
-            channel.queue_bind(queue="queue-2", exchange="amq.direct", 
routing_key="key1")
+            session.exchange_bind(queue="queue-2", exchange="amq.direct", 
binding_key="key1")
             self.fail("Expected bind of non-existant queue to fail")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
 
     def test_unbind_direct(self):
         self.unbind_test(exchange="amq.direct", routing_key="key")
@@ -159,42 +158,53 @@
     def test_unbind_headers(self):
         self.unbind_test(exchange="amq.match", args={ "x-match":"all", 
"a":"b"}, headers={"a":"b"})
 
-    def unbind_test(self, exchange, routing_key="", args=None, headers={}):
+    def unbind_test(self, exchange, routing_key="", args=None, headers=None):
         #bind two queues and consume from them
-        channel = self.channel
+        session = self.session
         
-        channel.queue_declare(queue="queue-1", exclusive=True, 
auto_delete=True)
-        channel.queue_declare(queue="queue-2", exclusive=True, 
auto_delete=True)
-
-        self.subscribe(queue="queue-1", destination="queue-1")
-        self.subscribe(queue="queue-2", destination="queue-2")
-
-        queue1 = self.client.queue("queue-1")
-        queue2 = self.client.queue("queue-2")
-
-        channel.queue_bind(exchange=exchange, queue="queue-1", 
routing_key=routing_key, arguments=args)
-        channel.queue_bind(exchange=exchange, queue="queue-2", 
routing_key=routing_key, arguments=args)
+        session.queue_declare(queue="queue-1", exclusive=True, 
auto_delete=True)
+        session.queue_declare(queue="queue-2", exclusive=True, 
auto_delete=True)
 
+        session.message_subscribe(queue="queue-1", destination="queue-1")
+        session.message_flow(destination="queue-1", unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination="queue-1", unit=1, value=0xFFFFFFFF)
+        session.message_subscribe(queue="queue-2", destination="queue-2")
+        session.message_flow(destination="queue-2", unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination="queue-2", unit=1, value=0xFFFFFFFF)
+
+        queue1 = session.incoming("queue-1")
+        queue2 = session.incoming("queue-2")
+
+        session.exchange_bind(exchange=exchange, queue="queue-1", 
binding_key=routing_key, arguments=args)
+        session.exchange_bind(exchange=exchange, queue="queue-2", 
binding_key=routing_key, arguments=args)
+
+        dp = session.delivery_properties(routing_key=routing_key)
+        if (headers):
+            mp = session.message_properties(application_headers=headers)
+            msg1 = Message(dp, mp, "one")
+            msg2 = Message(dp, mp, "two")
+        else:
+            msg1 = Message(dp, "one")
+            msg2 = Message(dp, "two")
+            
         #send a message that will match both bindings
-        channel.message_transfer(destination=exchange,
-                                 content=Content("one", 
properties={'routing_key':routing_key, 'application_headers':headers}))
+        session.message_transfer(destination=exchange, message=msg1)
         
         #unbind first queue
-        channel.queue_unbind(exchange=exchange, queue="queue-1", 
routing_key=routing_key, arguments=args)
+        session.exchange_unbind(exchange=exchange, queue="queue-1", 
binding_key=routing_key)
         
         #send another message
-        channel.message_transfer(destination=exchange,
-                                 content=Content("two", 
properties={'routing_key':routing_key, 'application_headers':headers}))
+        session.message_transfer(destination=exchange, message=msg2)
 
         #check one queue has both messages and the other has only one
-        self.assertEquals("one", queue1.get(timeout=1).content.body)
+        self.assertEquals("one", queue1.get(timeout=1).body)
         try:
             msg = queue1.get(timeout=1)
-            self.fail("Got extra message: %s" % msg.content.body)
+            self.fail("Got extra message: %s" % msg.body)
         except Empty: pass
 
-        self.assertEquals("one", queue2.get(timeout=1).content.body)
-        self.assertEquals("two", queue2.get(timeout=1).content.body)
+        self.assertEquals("one", queue2.get(timeout=1).body)
+        self.assertEquals("two", queue2.get(timeout=1).body)
         try:
             msg = queue2.get(timeout=1)
             self.fail("Got extra message: " + msg)
@@ -205,29 +215,32 @@
         """
         Test core queue deletion behaviour
         """
-        channel = self.channel
+        session = self.session
 
         #straight-forward case:
-        channel.queue_declare(queue="delete-me")
-        channel.message_transfer(content=Content("a", 
properties={'routing_key':"delete-me"}))
-        channel.message_transfer(content=Content("b", 
properties={'routing_key':"delete-me"}))
-        channel.message_transfer(content=Content("c", 
properties={'routing_key':"delete-me"}))
-        channel.queue_delete(queue="delete-me")
+        session.queue_declare(queue="delete-me")
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
 "a"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
 "b"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
 "c"))
+        session.queue_delete(queue="delete-me")
         #check that it has gone be declaring passively
         try:
-            channel.queue_declare(queue="delete-me", passive=True)
+            session.queue_declare(queue="delete-me", passive=True)
             self.fail("Queue has not been deleted")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
 
+    def test_delete_queue_exists(self):
+        """
+        Test core queue deletion behaviour
+        """
         #check attempted deletion of non-existant queue is handled correctly:  
  
-        channel = self.client.channel(2)
-        channel.session_open()
+        session = self.session
         try:
-            channel.queue_delete(queue="i-dont-exist", if_empty=True)
+            session.queue_delete(queue="i-dont-exist", if_empty=True)
             self.fail("Expected delete of non-existant queue to fail")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
 
         
 
@@ -235,104 +248,103 @@
         """
         Test that if_empty field of queue_delete is honoured
         """
-        channel = self.channel
+        session = self.session
 
         #create a queue and add a message to it (use default binding):
-        channel.queue_declare(queue="delete-me-2")
-        channel.queue_declare(queue="delete-me-2", passive=True)
-        channel.message_transfer(content=Content("message", 
properties={'routing_key':"delete-me-2"}))
+        session.queue_declare(queue="delete-me-2")
+        session.queue_declare(queue="delete-me-2", passive=True)
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"),
 "message"))
 
         #try to delete, but only if empty:
         try:
-            channel.queue_delete(queue="delete-me-2", if_empty=True)
+            session.queue_delete(queue="delete-me-2", if_empty=True)
             self.fail("Expected delete if_empty to fail for non-empty queue")
-        except Closed, e:
-            self.assertChannelException(406, e.args[0])
+        except SessionException, e:
+            self.assertEquals(406, e.args[0].error_code)
 
-        #need new channel now:    
-        channel = self.client.channel(2)
-        channel.session_open()
+        #need new session now:    
+        session = self.conn.session("replacement", 2)
 
         #empty queue:
-        self.subscribe(channel, destination="consumer_tag", 
queue="delete-me-2")
-        queue = self.client.queue("consumer_tag")
+        session.message_subscribe(destination="consumer_tag", 
queue="delete-me-2")
+        session.message_flow(destination="consumer_tag", unit=0, 
value=0xFFFFFFFF)
+        session.message_flow(destination="consumer_tag", unit=1, 
value=0xFFFFFFFF)
+        queue = session.incoming("consumer_tag")
         msg = queue.get(timeout=1)
-        self.assertEqual("message", msg.content.body)
-        channel.message_cancel(destination="consumer_tag")
+        self.assertEqual("message", msg.body)
+        session.message_cancel(destination="consumer_tag")
 
         #retry deletion on empty queue:
-        channel.queue_delete(queue="delete-me-2", if_empty=True)
+        session.queue_delete(queue="delete-me-2", if_empty=True)
 
         #check that it has gone by declaring passively:
         try:
-            channel.queue_declare(queue="delete-me-2", passive=True)
+            session.queue_declare(queue="delete-me-2", passive=True)
             self.fail("Queue has not been deleted")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
         
     def test_delete_ifunused(self):
         """
         Test that if_unused field of queue_delete is honoured
         """
-        channel = self.channel
+        session = self.session
 
         #create a queue and register a consumer:
-        channel.queue_declare(queue="delete-me-3")
-        channel.queue_declare(queue="delete-me-3", passive=True)
-        self.subscribe(destination="consumer_tag", queue="delete-me-3")
-
-        #need new channel now:    
-        channel2 = self.client.channel(2)
-        channel2.session_open()
+        session.queue_declare(queue="delete-me-3")
+        session.queue_declare(queue="delete-me-3", passive=True)
+        session.message_subscribe(destination="consumer_tag", 
queue="delete-me-3")
+
+        #need new session now:    
+        session2 = self.conn.session("replacement", 2)
+
         #try to delete, but only if empty:
         try:
-            channel2.queue_delete(queue="delete-me-3", if_unused=True)
+            session2.queue_delete(queue="delete-me-3", if_unused=True)
             self.fail("Expected delete if_unused to fail for queue with 
existing consumer")
-        except Closed, e:
-            self.assertChannelException(406, e.args[0])
-
+        except SessionException, e:
+            self.assertEquals(406, e.args[0].error_code)
 
-        channel.message_cancel(destination="consumer_tag")    
-        channel.queue_delete(queue="delete-me-3", if_unused=True)
+        session.message_cancel(destination="consumer_tag")    
+        session.queue_delete(queue="delete-me-3", if_unused=True)
         #check that it has gone by declaring passively:
         try:
-            channel.queue_declare(queue="delete-me-3", passive=True)
+            session.queue_declare(queue="delete-me-3", passive=True)
             self.fail("Queue has not been deleted")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
 
 
     def test_autodelete_shared(self):
         """
         Test auto-deletion (of non-exclusive queues)
         """
-        channel = self.channel
-        other = self.connect()
-        channel2 = other.channel(1)
-        channel2.session_open()
-
-        channel.queue_declare(queue="auto-delete-me", auto_delete=True)
-
-        #consume from both channels
-        reply = channel.basic_consume(queue="auto-delete-me")
-        channel2.basic_consume(queue="auto-delete-me")
+        session = self.session
+        session2 =self.conn.session("other", 1)
+
+        session.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+        #consume from both sessions
+        tag = "my-tag"
+        session.message_subscribe(queue="auto-delete-me", destination=tag)
+        session2.message_subscribe(queue="auto-delete-me", destination=tag)
 
         #implicit cancel
-        channel2.session_close()
+        session2.close()
 
         #check it is still there
-        channel.queue_declare(queue="auto-delete-me", passive=True)
+        session.queue_declare(queue="auto-delete-me", passive=True)
 
         #explicit cancel => queue is now unused again:
-        channel.basic_cancel(consumer_tag=reply.consumer_tag)
+        session.message_cancel(destination=tag)
 
         #NOTE: this assumes there is no timeout in use
 
-        #check that it has gone be declaring passively
+        #check that it has gone by declaring it passively
         try:
-            channel.queue_declare(queue="auto-delete-me", passive=True)
+            session.queue_declare(queue="auto-delete-me", passive=True)
             self.fail("Expected queue to have been deleted")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
 
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py?rev=648207&r1=648206&r2=648207&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py 
(original)
+++ incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10/tx.py Tue Apr 
15 03:28:06 2008
@@ -18,10 +18,10 @@
 #
 from qpid.client import Client, Closed
 from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import testrunner, TestBase010
 
-class TxTests(TestBase):
+class TxTests(TestBase010):
     """
     Tests for 'methods' on the amqp tx 'class'
     """
@@ -30,202 +30,236 @@
         """
         Test that commited publishes are delivered and commited acks are not 
re-delivered
         """
-        channel2 = self.client.channel(2)
-        channel2.session_open()
-        self.perform_txn_work(channel2, "tx-commit-a", "tx-commit-b", 
"tx-commit-c")
-        channel2.tx_commit()
-        channel2.session_close()
-
-        #use a different channel with new subscriptions to ensure
-        #there is no redelivery of acked messages:
-        channel = self.channel
-        channel.tx_select()
+        session = self.session
 
-        self.subscribe(channel, queue="tx-commit-a", destination="qa", 
confirm_mode=1)
-        queue_a = self.client.queue("qa")
+        #declare queues and create subscribers in the checking session
+        #to ensure that the queues are not auto-deleted too early:        
+        self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+        session.message_subscribe(queue="tx-commit-a", destination="qa")
+        session.message_subscribe(queue="tx-commit-b", destination="qb")
+        session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+        #use a separate session for actual work
+        session2 = self.conn.session("worker", 2)
+        self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", 
"tx-commit-c")
+        session2.tx_commit()
+        session2.close()
+
+        session.tx_select()
 
-        self.subscribe(channel, queue="tx-commit-b", destination="qb", 
confirm_mode=1)
-        queue_b = self.client.queue("qb")
+        self.enable_flow("qa")
+        queue_a = session.incoming("qa")
 
-        self.subscribe(channel, queue="tx-commit-c", destination="qc", 
confirm_mode=1)
-        queue_c = self.client.queue("qc")
+        self.enable_flow("qb")
+        queue_b = session.incoming("qb")
+
+        self.enable_flow("qc")
+        queue_c = session.incoming("qc")
 
         #check results
         for i in range(1, 5):
             msg = queue_c.get(timeout=1)
-            self.assertEqual("TxMessage %d" % i, msg.content.body)
-            msg.complete()
+            self.assertEqual("TxMessage %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
 
         msg = queue_b.get(timeout=1)
-        self.assertEqual("TxMessage 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("TxMessage 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         msg = queue_a.get(timeout=1)
-        self.assertEqual("TxMessage 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("TxMessage 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
         #cleanup
-        channel.tx_commit()
+        session.tx_commit()
 
     def test_auto_rollback(self):
         """
-        Test that a channel closed with an open transaction is effectively 
rolled back
+        Test that a session closed with an open transaction is effectively 
rolled back
         """
-        channel2 = self.client.channel(2)
-        channel2.session_open()
-        queue_a, queue_b, queue_c = self.perform_txn_work(channel2, 
"tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+        session = self.session
+        self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", 
"tx-autorollback-c"])
+        session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+        session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+        session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
+        session2 = self.conn.session("worker", 2)
+        queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, 
"tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
-        channel2.session_close()
-        channel = self.channel
-        channel.tx_select()
+        session2.close()
+
+        session.tx_select()
 
-        self.subscribe(channel, queue="tx-autorollback-a", destination="qa", 
confirm_mode=1)
-        queue_a = self.client.queue("qa")
+        self.enable_flow("qa")
+        queue_a = session.incoming("qa")
 
-        self.subscribe(channel, queue="tx-autorollback-b", destination="qb", 
confirm_mode=1)
-        queue_b = self.client.queue("qb")
+        self.enable_flow("qb")
+        queue_b = session.incoming("qb")
 
-        self.subscribe(channel, queue="tx-autorollback-c", destination="qc", 
confirm_mode=1)
-        queue_c = self.client.queue("qc")
+        self.enable_flow("qc")
+        queue_c = session.incoming("qc")
 
         #check results
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
-            self.assertEqual("Message %d" % i, msg.content.body)
-            msg.complete()
+            self.assertEqual("Message %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
 
         msg = queue_b.get(timeout=1)
-        self.assertEqual("Message 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         msg = queue_c.get(timeout=1)
-        self.assertEqual("Message 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
         #cleanup
-        channel.tx_commit()
+        session.tx_commit()
 
     def test_rollback(self):
         """
         Test that rolled back publishes are not delivered and rolled back acks 
are re-delivered
         """
-        channel = self.channel
-        queue_a, queue_b, queue_c = self.perform_txn_work(channel, 
"tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+        session = self.session
+        queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, 
"tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
-        #stop subscriptions (ensures no delivery occurs during rollback as 
messages are requeued)
-        for d in ["sub_a", "sub_b", "sub_c"]:
-            channel.message_stop(destination=d)
-
-        channel.tx_rollback()
-
-        #restart susbcriptions
-        for d in ["sub_a", "sub_b", "sub_c"]:
-            channel.message_flow(destination=d, unit=0, value=0xFFFFFFFF)
-            channel.message_flow(destination=d, unit=1, value=0xFFFFFFFF)
+        session.tx_rollback()
+
+        #need to release messages to get them redelivered now:
+        session.message_release(consumed)
 
         #check results
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
-            self.assertEqual("Message %d" % i, msg.content.body)
-            msg.complete()
+            self.assertEqual("Message %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
 
         msg = queue_b.get(timeout=1)
-        self.assertEqual("Message 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         msg = queue_c.get(timeout=1)
-        self.assertEqual("Message 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
         for q in [queue_a, queue_b, queue_c]:
             try:
                 extra = q.get(timeout=1)
-                self.fail("Got unexpected message: " + extra.content.body)
+                self.fail("Got unexpected message: " + extra.body)
             except Empty: None
 
         #cleanup
-        channel.tx_commit()
+        session.tx_commit()
 
-    def perform_txn_work(self, channel, name_a, name_b, name_c):
+    def perform_txn_work(self, session, name_a, name_b, name_c):
         """
         Utility method that does some setup and some work under a transaction. 
Used for testing both
         commit and rollback
         """
         #setup:
-        channel.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
-        channel.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
-        channel.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+        self.declare_queues([name_a, name_b, name_c])
 
         key = "my_key_" + name_b
         topic = "my_topic_" + name_c 
     
-        channel.queue_bind(queue=name_b, exchange="amq.direct", 
routing_key=key)
-        channel.queue_bind(queue=name_c, exchange="amq.topic", 
routing_key=topic)
+        session.exchange_bind(queue=name_b, exchange="amq.direct", 
binding_key=key)
+        session.exchange_bind(queue=name_c, exchange="amq.topic", 
binding_key=topic)
 
+        dp = session.delivery_properties(routing_key=name_a)
         for i in range(1, 5):
-            
channel.message_transfer(content=Content(properties={'routing_key':name_a, 
'message_id':"msg%d" % i}, body="Message %d" % i))
+            mp = session.message_properties(message_id="msg%d" % i)
+            session.message_transfer(message=Message(dp, mp, "Message %d" % i))
+
+        dp = session.delivery_properties(routing_key=key)
+        mp = session.message_properties(message_id="msg6")
+        session.message_transfer(destination="amq.direct", message=Message(dp, 
mp, "Message 6"))
 
-        channel.message_transfer(destination="amq.direct",
-                                 
content=Content(properties={'routing_key':key, 'message_id':"msg6"}, 
body="Message 6"))
-        channel.message_transfer(destination="amq.topic",
-                                 
content=Content(properties={'routing_key':topic, 'message_id':"msg7"}, 
body="Message 7"))
+        dp = session.delivery_properties(routing_key=topic)
+        mp = session.message_properties(message_id="msg7")
+        session.message_transfer(destination="amq.topic", message=Message(dp, 
mp, "Message 7"))
 
-        channel.tx_select()
+        session.tx_select()
 
         #consume and ack messages
-        self.subscribe(channel, queue=name_a, destination="sub_a", 
confirm_mode=1)
-        queue_a = self.client.queue("sub_a")
+        acked = RangedSet()
+        self.subscribe(session, queue=name_a, destination="sub_a")
+        queue_a = session.incoming("sub_a")
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
-            self.assertEqual("Message %d" % i, msg.content.body)
+            acked.add(msg.id)
+            self.assertEqual("Message %d" % i, msg.body)
 
-        msg.complete()
-
-        self.subscribe(channel, queue=name_b, destination="sub_b", 
confirm_mode=1)
-        queue_b = self.client.queue("sub_b")
+        self.subscribe(session, queue=name_b, destination="sub_b")
+        queue_b = session.incoming("sub_b")
         msg = queue_b.get(timeout=1)
-        self.assertEqual("Message 6", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 6", msg.body)
+        acked.add(msg.id)
 
-        sub_c = self.subscribe(channel, queue=name_c, destination="sub_c", 
confirm_mode=1)
-        queue_c = self.client.queue("sub_c")
+        sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+        queue_c = session.incoming("sub_c")
         msg = queue_c.get(timeout=1)
-        self.assertEqual("Message 7", msg.content.body)
-        msg.complete()
+        self.assertEqual("Message 7", msg.body)
+        acked.add(msg.id)
+
+        session.message_accept(acked)
 
+        dp = session.delivery_properties(routing_key=topic)
         #publish messages
         for i in range(1, 5):
-            channel.message_transfer(destination="amq.topic",
-                                     
content=Content(properties={'routing_key':topic, 'message_id':"tx-msg%d" % i},
-                                                     body="TxMessage %d" % i))
-
-        channel.message_transfer(destination="amq.direct",
-                                 
content=Content(properties={'routing_key':key, 'message_id':"tx-msg6"},
-                                                 body="TxMessage 6"))
-        
channel.message_transfer(content=Content(properties={'routing_key':name_a, 
'message_id':"tx-msg7"},
-                                                 body="TxMessage 7"))
-        return queue_a, queue_b, queue_c
+            mp = session.message_properties(message_id="tx-msg%d" % i)
+            session.message_transfer(destination="amq.topic", 
message=Message(dp, mp, "TxMessage %d" % i))
+
+        dp = session.delivery_properties(routing_key=key)
+        mp = session.message_properties(message_id="tx-msg6")
+        session.message_transfer(destination="amq.direct", message=Message(dp, 
mp, "TxMessage 6"))
+        
+        dp = session.delivery_properties(routing_key=name_a)
+        mp = session.message_properties(message_id="tx-msg7")
+        session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+        return queue_a, queue_b, queue_c, acked
+
+    def declare_queues(self, names, session=None):
+        session = session or self.session
+        for n in names:
+            session.queue_declare(queue=n, auto_delete=True)
+
+    def subscribe(self, session=None, **keys):
+        session = session or self.session
+        consumer_tag = keys["destination"]
+        session.message_subscribe(**keys)
+        session.message_flow(destination=consumer_tag, unit=0, 
value=0xFFFFFFFF)
+        session.message_flow(destination=consumer_tag, unit=1, 
value=0xFFFFFFFF)
+
+    def enable_flow(self, tag, session=None):
+        session = session or self.session
+        session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF)
+
+    def complete(self, session, msg):
+        session.receiver._completed.add(msg.id)#TODO: this may be done 
automatically
+        session.channel.session_completed(session.receiver._completed)
+

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/__init__.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/__init__.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/alternate_exchange.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/alternate_exchange.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/broker.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/broker.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/dtx.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/dtx.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/example.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/example.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/exchange.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/exchange.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/execution.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/execution.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/management.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/management.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/message.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/persistence.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/persistence.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/query.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/query.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/queue.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/testlib.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/testlib.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/tx.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/python/tests_0-10_preview/tx.py?rev=648207&r1=647716&r2=648207&view=diff
==============================================================================
    (empty)


Reply via email to