Author: gsim
Date: Tue Mar 11 08:34:20 2008
New Revision: 635976

URL: http://svn.apache.org/viewvc?rev=635976&view=rev
Log:
Fixed headers exchange to allow unbind using binding key and not args
Converted alternate exchange python tests 


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/qpid/datatypes.py
    incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
    incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp?rev=635976&r1=635975&r2=635976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/HeadersExchange.cpp Tue Mar 
11 08:34:20 2008
@@ -72,7 +72,7 @@
     return what->get<std::string>();
 }
 
-bool HeadersExchange::bind(Queue::shared_ptr queue, const string& 
/*routingKey*/, const FieldTable* args){
+bool HeadersExchange::bind(Queue::shared_ptr queue, const string& bindingKey, 
const FieldTable* args){
     RWlock::ScopedWlock locker(lock);
     std::string what = getMatch(args);
     if (what != all && what != any)
@@ -85,7 +85,7 @@
             break;
 
     if (i == bindings.end()) {
-        Binding::shared_ptr binding (new Binding ("", queue, this));
+        Binding::shared_ptr binding (new Binding (bindingKey, queue, this));
         HeaderMap headerMap(*args, binding);
 
         bindings.push_back(headerMap);
@@ -98,12 +98,18 @@
     }
 }
 
-bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& 
/*routingKey*/, const FieldTable* args){
+bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& 
bindingKey, const FieldTable* args){
     RWlock::ScopedWlock locker(lock);
     Bindings::iterator i;
-    for (i = bindings.begin(); i != bindings.end(); i++)
-        if (i->first == *args && i->second->queue == queue)
-            break;
+    for (i = bindings.begin(); i != bindings.end(); i++) {
+        if (args) {
+            if (i->first == *args && i->second->queue == queue)
+                break;
+        } else {
+            if (i->second->key == bindingKey && i->second->queue == queue)
+                break;
+        }
+    }
 
     if (i != bindings.end()) {
         bindings.erase(i);

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=635976&r1=635975&r2=635976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Tue Mar 11 08:34:20 
2008
@@ -1,15 +1,10 @@
 tests.codec.FieldTableTestCase.test_field_table_decode
 tests.codec.FieldTableTestCase.test_field_table_multiple_name_value_pair
 tests.codec.FieldTableTestCase.test_field_table_name_value_pair
-tests_0-10.query.QueryTests.test_exchange_bound_header
 tests_0-10.tx.TxTests.test_auto_rollback
 tests_0-10.tx.TxTests.test_commit
 tests_0-10.tx.TxTests.test_rollback
 tests_0-10.execution.ExecutionTests.test_flush
-tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_exchange
-tests_0-10.alternate_exchange.AlternateExchangeTests.test_delete_while_used_by_queue
-tests_0-10.alternate_exchange.AlternateExchangeTests.test_queue_delete
-tests_0-10.alternate_exchange.AlternateExchangeTests.test_unroutable
 tests_0-10.dtx.DtxTests.test_bad_resume
 tests_0-10.dtx.DtxTests.test_end
 tests_0-10.dtx.DtxTests.test_end_suspend_and_fail
@@ -46,5 +41,3 @@
 tests_0-10.testlib.TestBaseTest.testMessageProperties
 tests_0-10.queue.QueueTests.test_autodelete_shared
 tests_0-10.queue.QueueTests.test_declare_exclusive
-tests_0-10.queue.QueueTests.test_bind
-tests_0-10.queue.QueueTests.test_unbind_headers

Modified: incubator/qpid/trunk/qpid/python/qpid/datatypes.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/datatypes.py?rev=635976&r1=635975&r2=635976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/datatypes.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/datatypes.py Tue Mar 11 08:34:20 2008
@@ -44,7 +44,7 @@
 
     if kwargs:
       unexpected = kwargs.keys()[0]
-      raise TypeError("%s() got an unexpected keywoard argument '%s'" %
+      raise TypeError("%s() got an unexpected keyword argument '%s'" %
                       (_type.name, unexpected))
 
   def __getitem__(self, name):

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py?rev=635976&r1=635975&r2=635976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py Tue Mar 
11 08:34:20 2008
@@ -16,12 +16,13 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-from qpid.client import Client, Closed
+import traceback
 from qpid.queue import Empty
-from qpid.content import Content
-from qpid.testlib import testrunner, TestBase
+from qpid.datatypes import Message
+from qpid.testlib import TestBase010
+from qpid.session import SessionException
 
-class AlternateExchangeTests(TestBase):
+class AlternateExchangeTests(TestBase010):
     """
     Tests for the new mechanism for message returns introduced in 0-10
     and available in 0-9 for preview
@@ -31,36 +32,42 @@
         """
         Test that unroutable messages are delivered to the alternate-exchange 
if specified
         """
-        channel = self.channel
+        session = self.session
         #create an exchange with an alternate defined
-        channel.exchange_declare(exchange="secondary", type="fanout")
-        channel.exchange_declare(exchange="primary", type="direct", 
alternate_exchange="secondary")
+        session.exchange_declare(exchange="secondary", type="fanout")
+        session.exchange_declare(exchange="primary", type="direct", 
alternate_exchange="secondary")
 
         #declare, bind (to the alternate exchange) and consume from a queue 
for 'returned' messages
-        channel.queue_declare(queue="returns", exclusive=True, 
auto_delete=True)
-        channel.queue_bind(queue="returns", exchange="secondary")
-        self.subscribe(destination="a", queue="returns")
-        returned = self.client.queue("a")
+        session.queue_declare(queue="returns", exclusive=True, 
auto_delete=True)
+        session.exchange_bind(queue="returns", exchange="secondary")
+        session.message_subscribe(destination="a", queue="returns")
+        session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination="a", unit=1, value=0xFFFFFFFF)
+        returned = session.incoming("a")
 
         #declare, bind (to the primary exchange) and consume from a queue for 
'processed' messages
-        channel.queue_declare(queue="processed", exclusive=True, 
auto_delete=True)
-        channel.queue_bind(queue="processed", exchange="primary", 
routing_key="my-key")
-        self.subscribe(destination="b", queue="processed")
-        processed = self.client.queue("b")
+        session.queue_declare(queue="processed", exclusive=True, 
auto_delete=True)
+        session.exchange_bind(queue="processed", exchange="primary", 
binding_key="my-key")
+        session.message_subscribe(destination="b", queue="processed")
+        session.message_flow(destination="b", unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination="b", unit=1, value=0xFFFFFFFF)
+        processed = session.incoming("b")
 
         #publish to the primary exchange
         #...one message that makes it to the 'processed' queue:
-        channel.message_transfer(destination="primary", 
content=Content("Good", properties={'routing_key':"my-key"}))
+        dp=self.session.delivery_properties(routing_key="my-key")
+        session.message_transfer(destination="primary", message=Message(dp, 
"Good"))
         #...and one that does not:
-        channel.message_transfer(destination="primary", content=Content("Bad", 
properties={'routing_key':"unused-key"}))
+        dp=self.session.delivery_properties(routing_key="unused-key")
+        session.message_transfer(destination="primary", message=Message(dp, 
"Bad"))
 
         #delete the exchanges
-        channel.exchange_delete(exchange="primary")
-        channel.exchange_delete(exchange="secondary")
+        session.exchange_delete(exchange="primary")
+        session.exchange_delete(exchange="secondary")
 
         #verify behaviour
-        self.assertEqual("Good", processed.get(timeout=1).content.body)
-        self.assertEqual("Bad", returned.get(timeout=1).content.body)
+        self.assertEqual("Good", processed.get(timeout=1).body)
+        self.assertEqual("Bad", returned.get(timeout=1).body)
         self.assertEmpty(processed)
         self.assertEmpty(returned)
 
@@ -68,29 +75,32 @@
         """
         Test that messages in a queue being deleted are delivered to the 
alternate-exchange if specified
         """
-        channel = self.channel
+        session = self.session
         #set up a 'dead letter queue':
-        channel.exchange_declare(exchange="dlq", type="fanout")
-        channel.queue_declare(queue="deleted", exclusive=True, 
auto_delete=True)
-        channel.queue_bind(exchange="dlq", queue="deleted")
-        self.subscribe(destination="dlq", queue="deleted")
-        dlq = self.client.queue("dlq")
+        session.exchange_declare(exchange="dlq", type="fanout")
+        session.queue_declare(queue="deleted", exclusive=True, 
auto_delete=True)
+        session.exchange_bind(exchange="dlq", queue="deleted")
+        session.message_subscribe(destination="dlq", queue="deleted")
+        session.message_flow(destination="dlq", unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination="dlq", unit=1, value=0xFFFFFFFF)
+        dlq = session.incoming("dlq")
 
         #create a queue using the dlq as its alternate exchange:
-        channel.queue_declare(queue="delete-me", alternate_exchange="dlq")
+        session.queue_declare(queue="delete-me", alternate_exchange="dlq")
         #send it some messages:
-        channel.message_transfer(content=Content("One", 
properties={'routing_key':"delete-me"}))
-        channel.message_transfer(content=Content("Two", 
properties={'routing_key':"delete-me"}))
-        channel.message_transfer(content=Content("Three", 
properties={'routing_key':"delete-me"}))
+        dp=self.session.delivery_properties(routing_key="delete-me")
+        session.message_transfer(message=Message(dp, "One"))
+        session.message_transfer(message=Message(dp, "Two"))
+        session.message_transfer(message=Message(dp, "Three"))
         #delete it:
-        channel.queue_delete(queue="delete-me")
+        session.queue_delete(queue="delete-me")
         #delete the dlq exchange:
-        channel.exchange_delete(exchange="dlq")
+        session.exchange_delete(exchange="dlq")
 
         #check the messages were delivered to the dlq:
-        self.assertEqual("One", dlq.get(timeout=1).content.body)
-        self.assertEqual("Two", dlq.get(timeout=1).content.body)
-        self.assertEqual("Three", dlq.get(timeout=1).content.body)
+        self.assertEqual("One", dlq.get(timeout=1).body)
+        self.assertEqual("Two", dlq.get(timeout=1).body)
+        self.assertEqual("Three", dlq.get(timeout=1).body)
         self.assertEmpty(dlq)
 
     def test_delete_while_used_by_queue(self):
@@ -98,23 +108,19 @@
         Ensure an exchange still in use as an alternate-exchange for a
         queue can't be deleted
         """
-        channel = self.channel
-        channel.exchange_declare(exchange="alternate", type="fanout")
-        channel.queue_declare(queue="q", exclusive=True, auto_delete=True, 
alternate_exchange="alternate")
+        session = self.session
+        session.exchange_declare(exchange="alternate", type="fanout")
+
+        session = self.conn.session("alternate", 2)
+        session.queue_declare(queue="q", exclusive=True, auto_delete=True, 
alternate_exchange="alternate")
         try:
-            channel.exchange_delete(exchange="alternate")
+            session.exchange_delete(exchange="alternate")
             self.fail("Expected deletion of in-use alternate-exchange to fail")
-        except Closed, e:
-            #cleanup:
-            other = self.connect()
-            channel = other.channel(1)
-            channel.session_open()
-            channel.exchange_delete(exchange="alternate")
-            channel.session_close()
-            other.close()
-            
-            self.assertConnectionException(530, e.args[0])            
-
+        except SessionException, e:
+            session = self.session
+            session.queue_delete(queue="q")
+            session.exchange_delete(exchange="alternate")
+            self.assertEquals(530, e.args[0].error_code)            
 
 
     def test_delete_while_used_by_exchange(self):
@@ -122,25 +128,19 @@
         Ensure an exchange still in use as an alternate-exchange for 
         another exchange can't be deleted
         """
-        channel = self.channel
-        channel.exchange_declare(exchange="alternate", type="fanout")
-        channel.exchange_declare(exchange="e", type="fanout", 
alternate_exchange="alternate")
+        session = self.session
+        session.exchange_declare(exchange="alternate", type="fanout")
+
+        session = self.conn.session("alternate", 2)
+        session.exchange_declare(exchange="e", type="fanout", 
alternate_exchange="alternate")
         try:
-            channel.exchange_delete(exchange="alternate")
-            #cleanup:
-            channel.exchange_delete(exchange="e")
+            session.exchange_delete(exchange="alternate")
             self.fail("Expected deletion of in-use alternate-exchange to fail")
-        except Closed, e:
-            #cleanup:
-            other = self.connect()
-            channel = other.channel(1)
-            channel.session_open()
-            channel.exchange_delete(exchange="e")
-            channel.exchange_delete(exchange="alternate")
-            channel.session_close()
-            other.close()
-
-            self.assertConnectionException(530, e.args[0])
+        except SessionException, e:
+            session = self.session
+            session.exchange_delete(exchange="e")
+            session.exchange_delete(exchange="alternate")
+            self.assertEquals(530, e.args[0].error_code)
             
 
     def assertEmpty(self, queue):
@@ -148,4 +148,3 @@
             msg = queue.get(timeout=1) 
             self.fail("Queue not empty: " + msg)
         except Empty: None
-

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py?rev=635976&r1=635975&r2=635976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py Tue Mar 11 08:34:20 
2008
@@ -45,6 +45,7 @@
         except:
             print "Error on tearDown:"
             print traceback.print_exc()
+        TestBase010.tearDown(self)
 
     def createMessage(self, key="", body=""):
         return Message(self.session.delivery_properties(routing_key=key), body)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=635976&r1=635975&r2=635976&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Tue Mar 11 08:34:20 
2008
@@ -126,27 +126,26 @@
         session.queue_declare(queue="queue-1", exclusive=True, 
auto_delete=True)
 
         #straightforward case, both exchange & queue exist so no errors 
expected:
-        session.queue_bind(queue="queue-1", exchange="amq.direct", 
routing_key="key1")
+        session.exchange_bind(queue="queue-1", exchange="amq.direct", 
binding_key="key1")
 
         #use the queue name where the routing key is not specified:
         session.exchange_bind(queue="queue-1", exchange="amq.direct")
 
         #try and bind to non-existant exchange
         try:
-            session.queue_bind(queue="queue-1", 
exchange="an-invalid-exchange", routing_key="key1")
+            session.exchange_bind(queue="queue-1", 
exchange="an-invalid-exchange", binding_key="key1")
             self.fail("Expected bind to non-existant exchange to fail")
-        except Closed, e:
+        except SessionException, e:
             self.assertEquals(404, e.args[0].error_code)
 
-        #need to reopen a session:    
-        session = self.client.session(2)
-        session.session_open()
 
+    def test_bind_queue_existence(self):
+        session = self.session
         #try and bind non-existant queue:
         try:
-            session.queue_bind(queue="queue-2", exchange="amq.direct", 
routing_key="key1")
+            session.exchange_bind(queue="queue-2", exchange="amq.direct", 
binding_key="key1")
             self.fail("Expected bind of non-existant queue to fail")
-        except Closed, e:
+        except SessionException, e:
             self.assertEquals(404, e.args[0].error_code)
 
     def test_unbind_direct(self):


Reply via email to