Author: gsim
Date: Fri Mar  7 11:07:32 2008
New Revision: 634780

URL: http://svn.apache.org/viewvc?rev=634780&view=rev
Log:
Added acquire impl to final 0-10 codepath
Converted some more python tests


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    incubator/qpid/trunk/qpid/cpp/xml/extra.xml
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/run-tests
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Fri Mar  7 
11:07:32 2008
@@ -367,16 +367,20 @@
     commands.for_each(acceptOp);
 }
 
-/*
-void SessionAdapter::MessageHandlerImpl::acquire(const SequenceSet& transfers)
+framing::Message010AcquireResult 
SessionAdapter::MessageHandlerImpl::acquire(const framing::SequenceSet& 
transfers)
 {
+    //TODO: change this when SequenceNumberSet is deleted along with preview 
code
     SequenceNumberSet results;
-    RangedOperation op = boost::bind(&SemanticState::acquire, &state, _1, _2, 
boost::ref(results));
-    transfers.processRanges(op);
+    RangedOperation f = boost::bind(&SemanticState::acquire, &state, _1, _2, 
boost::ref(results));
+    transfers.for_each(f);
+
     results = results.condense();
-    getProxy().getMessage().acquired(results);
+    SequenceSet acquisitions;
+    RangedOperation g = boost::bind(&SequenceSet::add, &acquisitions, _1, _2);
+    results.processRanges(g);
+
+    return Message010AcquireResult(acquisitions);
 }
-*/
 
 
 void SessionAdapter::ExecutionHandlerImpl::sync()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Fri Mar  7 
11:07:32 2008
@@ -149,6 +149,8 @@
         void release(const framing::SequenceSet& commands,
                      bool setRedelivered);
         
+        framing::Message010AcquireResult acquire(const framing::SequenceSet&);
+
         void subscribe(const string& queue,
                        const string& destination,
                        uint8_t acceptMode,

Modified: incubator/qpid/trunk/qpid/cpp/xml/extra.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/extra.xml?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/extra.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/extra.xml Fri Mar  7 11:07:32 2008
@@ -670,7 +670,16 @@
         <field name="commands" domain="sequence-set"/>
         <field name="set-redelivered" domain="bit"/>
     </method>
-
+    <method name = "acquire" index="5">
+        <doc>blah, blah</doc>
+        <chassis name="server" implement="MUST" />
+        <field name="transfers" domain="sequence-set"/>
+        <result>
+            <struct size="long" type="4">
+                <field name="transfers" domain="sequence-set"/>
+            </struct>
+        </result>
+    </method>
 
     <method name = "subscribe" index="7">
         <doc>blah, blah</doc>

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Fri Mar  7 11:07:32 
2008
@@ -39,8 +39,6 @@
 tests_0-10.dtx.DtxTests.test_start_join_and_resume
 tests_0-10.dtx.DtxTests.test_suspend_resume
 tests_0-10.dtx.DtxTests.test_suspend_start_end_resume
-tests_0-10.message.MessageTests.test_acquire
-tests_0-10.message.MessageTests.test_subscribe_not_acquired_3
 tests_0-10.message.MessageTests.test_consume_exclusive
 tests_0-10.message.MessageTests.test_consume_no_local
 tests_0-10.message.MessageTests.test_consume_no_local_awkward
@@ -60,6 +58,6 @@
 tests_0-10.queue.QueueTests.test_delete_ifempty
 tests_0-10.queue.QueueTests.test_delete_ifunused
 tests_0-10.queue.QueueTests.test_delete_simple
-tests_0-10.queue.QueueTests.test_purge
 tests_0-10.queue.QueueTests.test_bind
 tests_0-10.queue.QueueTests.test_unbind_headers
+tests_0-10.queue.QueueTests.test_purge_empty_name
\ No newline at end of file

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Fri Mar  7 11:07:32 2008
@@ -353,5 +353,5 @@
         self.session = self.conn.session("test-session", timeout=10)
 
     def tearDown(self):
-        self.session.close(timeout=10)
+        if not self.session.error(): self.session.close(timeout=10)
         self.conn.close(timeout=10)

Modified: incubator/qpid/trunk/qpid/python/run-tests
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/run-tests?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/run-tests (original)
+++ incubator/qpid/trunk/qpid/python/run-tests Fri Mar  7 11:07:32 2008
@@ -21,7 +21,7 @@
 import sys, logging
 from qpid.testlib import testrunner
 
-if "-v" in sys.argv:
+if "-vv" in sys.argv:
   level = logging.DEBUG
 else:
   level = logging.WARN

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Fri Mar  7 11:07:32 
2008
@@ -635,9 +635,7 @@
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
 
         #use fanout for now:
-        session.exchange_bind(exchange="amq.fanout", queue="q")
-        session.message_transfer(destination="amq.fanout", 
message=Message("acquire me"))
-        
#session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
 "acquire me"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
 "acquire me"))
 
         session.message_subscribe(queue = "q", destination = "a", acquire_mode 
= 1)
         session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
@@ -647,12 +645,13 @@
         #message should still be on the queue:
         self.assertEquals(1, session.queue_query(queue = "q").message_count)
 
-        response = session.message_acquire(RangedSet(msg.id))
+        transfers = RangedSet(msg.id)
+        response = session.message_acquire(transfers)
         #check that we get notification (i.e. message_acquired)
-        self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+        self.assert_(msg.id in response.transfers)
         #message should have been removed from the queue:
         self.assertEquals(0, session.queue_query(queue = "q").message_count)
-        session.message_accept(RangedSet(msg.id))
+        session.message_accept(transfers)
 
 
     def test_release(self):
@@ -800,12 +799,12 @@
         session = self.session
 
         #publish some messages
-        self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
         for i in range(1, 11):
             
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
 "message-%d" % (i)))
 
         #create a not-acquired subscriber
-        session.message_subscribe(queue = "q", destination = "a", confirm_mode 
= 1, acquire_mode=1)
+        session.message_subscribe(queue = "q", destination = "a", 
acquire_mode=1)
         session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
         session.message_flow(unit = 0, value = 10, destination = "a")
 
@@ -816,19 +815,18 @@
             self.assertEquals("message-%d" % (i), msg.body)
             if (i % 2):
                 #try to acquire every second message
-                session.message_acquire([msg.command_id, msg.command_id])
+                response = session.message_acquire(RangedSet(msg.id))
                 #check that acquire succeeds
-                response = session.control_queue.get(timeout=1)
-                self.assertEquals(response.transfers, [msg.command_id, 
msg.command_id])
-            session.message_release(RangedSet(msg.id))
-            session.channel._completed.add(msg.id)
-            session.channel.session_completed(session.channel._completed)
-
-            msg.complete()
+                self.assert_(msg.id in response.transfers)
+                session.message_accept(RangedSet(msg.id))
+            else:
+                session.message_release(RangedSet(msg.id))
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
         self.assertEmpty(queue)
 
         #create a second not-acquired subscriber
-        session.message_subscribe(queue = "q", destination = "b", confirm_mode 
= 1, acquire_mode=1)
+        session.message_subscribe(queue = "q", destination = "b", 
acquire_mode=1)
         session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
         session.message_flow(unit = 0, value = 1, destination = "b")
         #check it gets those not consumed
@@ -836,7 +834,9 @@
         for i in [2,4,6,8,10]:
             msg = queue.get(timeout = 1)
             self.assertEquals("message-%d" % (i), msg.body)
-            msg.complete()
+            session.message_release(RangedSet(msg.id))
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
         session.message_flow(unit = 0, value = 1, destination = "b")
         self.assertEmpty(queue)
 
@@ -898,6 +898,21 @@
         queue = session.incoming("d")
         msg = queue.get(timeout = 3)
         self.assertEquals("message-body", msg.body)
+
+    def test_empty_body(self):
+        session = self.session
+        session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
+        props = session.delivery_properties(routing_key="xyz")
+        session.message_transfer(message=Message(props, ""))
+
+        consumer_tag = "tag1"
+        session.message_subscribe(queue="xyz", destination=consumer_tag)
+        session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = 
consumer_tag)
+        session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = 
consumer_tag)
+        queue = session.incoming(consumer_tag)
+        msg = queue.get(timeout=1)
+        self.assertEquals("", msg.body)
+        session.message_accept(RangedSet(msg.id))
 
     def assertDataEquals(self, session, msg, expected):
         self.assertEquals(expected, msg.body)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=634780&r1=634779&r2=634780&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Fri Mar  7 11:07:32 
2008
@@ -20,6 +20,7 @@
 from qpid.queue import Empty
 from qpid.testlib import TestBase010
 from qpid.datatypes import Message
+from qpid.session import SessionException
 
 class QueueTests(TestBase010):
     """Tests for 'methods' on the amqp queue 'class'"""
@@ -54,22 +55,31 @@
         msg = queue.get(timeout=1)
         self.assertEqual("four", msg.body)
 
-        #check error conditions (use new sessions): 
-        session = self.conn.session("error-checker")
+    def test_purge_queue_exists(self):
+        """        
+        Test that the correct exception is thrown is no queue exists
+        for the name specified in purge        
+        """        
+        session = self.session
         try:
             #queue specified but doesn't exist:
             session.queue_purge(queue="invalid-queue")            
             self.fail("Expected failure when purging non-existent queue")
-        except Closed, e:
-            self.assertChannelException(404, e.args[0])
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code) #not-found
 
-        session = self.conn.session("error-checker")
+    def test_purge_empty_name(self):        
+        """
+        Test that the correct exception is thrown is no queue name
+        is specified for purge
+        """        
+        session = self.session
         try:
             #queue not specified and none previously declared for channel:
             session.queue_purge()
             self.fail("Expected failure when purging unspecified queue")
-        except Closed, e:
-            self.assertConnectionException(530, e.args[0])
+        except SessionException, e:
+            self.assertEquals(531, e.args[0].error_code) #illegal-argument
 
     def test_declare_exclusive(self):
         """


Reply via email to