Author: gsim
Date: Wed Dec 10 15:23:24 2008
New Revision: 725483

URL: http://svn.apache.org/viewvc?rev=725483&view=rev
Log:
QPID-1526: add checks for exclusive ownership plus tests to verify they are 
executed


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=725483&r1=725482&r2=725483&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Wed Dec 10 
15:23:24 2008
@@ -346,7 +346,7 @@
         std::pair<Queue::shared_ptr, bool> queue_created =  
             getBroker().getQueues().declare(name, durable,
                                             autoDelete,
-                                            exclusive ? this : 0);
+                                            exclusive ? &session : 0);
         queue = queue_created.first;
         assert(queue);
         if (queue_created.second) { // This is a new queue
@@ -367,7 +367,7 @@
                 exclusiveQueues.push_back(queue);
             }
         } else {
-            if (exclusive && queue->setExclusiveOwner(this)) {
+            if (exclusive && queue->setExclusiveOwner(&session)) {
                 exclusiveQueues.push_back(queue);
             }
         }
@@ -379,7 +379,7 @@
                                                       queue_created.second ? 
"created" : "existing"));
     }
 
-    if (exclusive && !queue->isExclusiveOwner(this)) 
+    if (exclusive && !queue->isExclusiveOwner(&session)) 
         throw ResourceLockedException(QPID_MSG("Cannot grant exclusive access 
to queue "
                                                << queue->getName()));
 } 
@@ -405,6 +405,9 @@
     }
 
     Queue::shared_ptr q = getQueue(queue);
+    if (q->hasExclusiveOwner() && !q->isExclusiveOwner(&session)) 
+        throw ResourceLockedException(QPID_MSG("Cannot delete queue "
+                                               << queue << "; it is exclusive 
to another session"));
     if(ifEmpty && q->getMessageCount() > 0){
         throw PreconditionFailedException("Queue not empty.");
     }else if(ifUnused && q->getConsumerCount() > 0){
@@ -473,6 +476,9 @@
     Queue::shared_ptr queue = getQueue(queueName);
     if(!destination.empty() && state.exists(destination))
         throw NotAllowedException(QPID_MSG("Consumer tags must be unique"));
+    if (queue->hasExclusiveOwner() && !queue->isExclusiveOwner(&session)) 
+        throw ResourceLockedException(QPID_MSG("Cannot subscribe to exclusive 
queue "
+                                               << queue->getName()));
 
     state.consume(destination, queue, 
                   acceptMode == 0, acquireMode == 0, exclusive, 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=725483&r1=725482&r2=725483&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Wed Dec 10 
15:23:24 2008
@@ -109,7 +109,7 @@
     };
 
     class QueueHandlerImpl : public QueueHandler,
-            public HandlerHelper, public OwnershipToken
+            public HandlerHelper
     {
         Broker& broker;
         std::vector< boost::shared_ptr<Queue> > exclusiveQueues;

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/federation.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/federation.py?rev=725483&r1=725482&r2=725483&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/federation.py (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/federation.py Wed Dec 10 15:23:24 
2008
@@ -177,7 +177,7 @@
         #setup queue on remote broker and add some messages
         r_conn = self.connect(host=remote_host(), port=remote_port())
         r_session = r_conn.session("test_pull_from_queue")
-        r_session.queue_declare(queue="my-bridge-queue", exclusive=True, 
auto_delete=True)
+        r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
         for i in range(1, 6):
             dp = r_session.delivery_properties(routing_key="my-bridge-queue")
             r_session.message_transfer(message=Message(dp, "Message %d" % i))

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=725483&r1=725482&r2=725483&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Wed Dec 10 15:23:24 
2008
@@ -88,7 +88,7 @@
         # TestBase.setUp has already opened session(1)
         s1 = self.session
         # Here we open a second separate connection:
-        s2 = self.conn.session("other", 2)
+        s2 = self.conn.session("other")
 
         #declare an exclusive queue:
         s1.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
@@ -98,6 +98,22 @@
             self.fail("Expected second exclusive queue_declare to raise a 
channel exception")
         except SessionException, e:
             self.assertEquals(405, e.args[0].error_code)
+            
+        s3 = self.conn.session("subscriber")
+        try:
+            #other connection should not be allowed to declare this:
+            s3.message_subscribe(queue="exclusive-queue")
+            self.fail("Expected message_subscribe on an exclusive queue to 
raise a channel exception")
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
+
+        s4 = self.conn.session("deleter")
+        try:
+            #other connection should not be allowed to declare this:
+            s4.queue_delete(queue="exclusive-queue")
+            self.fail("Expected queue_delete on an exclusive queue to raise a 
channel exception")
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
 
 
     def test_declare_passive(self):


Reply via email to