Author: gsim
Date: Mon Mar 17 05:17:55 2008
New Revision: 637854

URL: http://svn.apache.org/viewvc?rev=637854&view=rev
Log:
Scope exclusive queues to sessions.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
    incubator/qpid/trunk/qpid/python/tests_0-10/tx.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h Mon Mar 17 
05:17:55 2008
@@ -21,13 +21,14 @@
 #ifndef _ConnectionToken_
 #define _ConnectionToken_
 
+#include "OwnershipToken.h"
 namespace qpid {
     namespace broker {
         /**
          * An empty interface allowing opaque implementations of some
          * form of token to identify a connection.
          */
-        class ConnectionToken{
+        class ConnectionToken : public OwnershipToken {
         public:
             virtual ~ConnectionToken(){}
         };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar 17 05:17:55 
2008
@@ -46,7 +46,7 @@
 
 Queue::Queue(const string& _name, bool _autodelete, 
              MessageStore* const _store,
-             const ConnectionToken* const _owner,
+             const OwnershipToken* const _owner,
              Manageable* parent) :
 
     name(_name), 
@@ -582,7 +582,7 @@
 
 }
 
-bool Queue::isExclusiveOwner(const ConnectionToken* const o) const 
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const 
 { 
     Mutex::ScopedLock locker(ownershipLock);
     return o == owner; 
@@ -594,7 +594,7 @@
     owner = 0; 
 }
 
-bool Queue::setExclusiveOwner(const ConnectionToken* const o) 
+bool Queue::setExclusiveOwner(const OwnershipToken* const o) 
 { 
     Mutex::ScopedLock locker(ownershipLock);
     if (owner) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Mar 17 05:17:55 
2008
@@ -28,7 +28,7 @@
 #include <boost/shared_ptr.hpp>
 #include <boost/enable_shared_from_this.hpp>
 #include "qpid/framing/amqp_types.h"
-#include "ConnectionToken.h"
+#include "OwnershipToken.h"
 #include "Consumer.h"
 #include "Message.h"
 #include "qpid/framing/FieldTable.h"
@@ -63,7 +63,7 @@
             const string name;
             const bool autodelete;
             MessageStore* store;
-            const ConnectionToken* owner;
+            const OwnershipToken* owner;
             uint32_t consumerCount;
             bool exclusive;
             Listeners listeners;
@@ -100,7 +100,7 @@
 
             Queue(const string& name, bool autodelete = false, 
                   MessageStore* const store = 0, 
-                  const ConnectionToken* const owner = 0,
+                  const OwnershipToken* const owner = 0,
                   Manageable* parent = 0);
             ~Queue();
 
@@ -143,9 +143,9 @@
             uint32_t getMessageCount() const;
             uint32_t getConsumerCount() const;
             inline const string& getName() const { return name; }
-            bool isExclusiveOwner(const ConnectionToken* const o) const;
+            bool isExclusiveOwner(const OwnershipToken* const o) const;
             void releaseExclusiveOwnership();
-            bool setExclusiveOwner(const ConnectionToken* const o);
+            bool setExclusiveOwner(const OwnershipToken* const o);
             bool hasExclusiveConsumer() const;
             bool hasExclusiveOwner() const;
             inline bool isDurable() const { return store != 0; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Mar 17 
05:17:55 2008
@@ -33,7 +33,7 @@
 
 std::pair<Queue::shared_ptr, bool>
 QueueRegistry::declare(const string& declareName, bool durable, 
-                       bool autoDelete, const ConnectionToken* owner)
+                       bool autoDelete, const OwnershipToken* owner)
 {
     RWlock::ScopedWlock locker(lock);
     string name = declareName.empty() ? generateName() : declareName;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Mar 17 
05:17:55 2008
@@ -48,7 +48,7 @@
      * was created by this declare call false if it already existed.
      */
     std::pair<Queue::shared_ptr, bool> declare(const string& name, bool 
durable = false, bool autodelete = false, 
-                                               const ConnectionToken* const 
owner = 0);
+                                               const OwnershipToken* const 
owner = 0);
 
     /**
      * Destroy the named queue.
@@ -62,7 +62,6 @@
      * subsequent calls to find or declare with the same name.
      *
      */
-    void destroyLH (const string& name);
     void destroy   (const string& name);
     template <class Test> bool destroyIf(const string& name, Test test)
     {
@@ -107,6 +106,9 @@
     int counter;
     MessageStore* store;
     management::Manageable* parent;
+
+    //destroy impl that assumes lock is already held:
+    void destroyLH (const string& name);
 };
 
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Mar 17 
05:17:55 2008
@@ -19,6 +19,7 @@
 #include "Connection.h"
 #include "DeliveryToken.h"
 #include "MessageDelivery.h"
+#include "Queue.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/reply_exceptions.h"
 #include <boost/format.hpp>
@@ -180,6 +181,22 @@
     }
 }
 
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) : 
HandlerHelper(session), broker(getBroker())
+{}
+
+
+SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
+{
+    while (!exclusiveQueues.empty()) {
+        Queue::shared_ptr q(exclusiveQueues.front());
+        q->releaseExclusiveOwnership();
+        if (q->canAutoDelete()) {
+            Queue::tryAutoDelete(broker, q);
+        }
+        exclusiveQueues.erase(exclusiveQueues.begin());
+    }
+}
+
 Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
 {
     Queue::shared_ptr queue = getQueue(name);
@@ -212,7 +229,7 @@
             getBroker().getQueues().declare(
                 name, durable,
                 autoDelete,
-                exclusive ? &getConnection() : 0);
+                exclusive ? this : 0);
        queue = queue_created.first;
        assert(queue);
        if (queue_created.second) { // This is a new queue
@@ -230,15 +247,15 @@
 
             //handle automatic cleanup:
            if (exclusive) {
-               getConnection().exclusiveQueues.push_back(queue);
+               exclusiveQueues.push_back(queue);
            }
        } else {
-            if (exclusive && queue->setExclusiveOwner(&getConnection())) {
-               getConnection().exclusiveQueues.push_back(queue);
+            if (exclusive && queue->setExclusiveOwner(this)) {
+               exclusiveQueues.push_back(queue);
             }
         }
     }
-    if (exclusive && !queue->isExclusiveOwner(&getConnection())) 
+    if (exclusive && !queue->isExclusiveOwner(this)) 
        throw ResourceLockedException(
             QPID_MSG("Cannot grant exclusive access to queue "
                      << queue->getName()));

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Mon Mar 17 
05:17:55 2008
@@ -25,8 +25,11 @@
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/SequenceSet.h"
+#include "OwnershipToken.h"
 
+#include <vector>
 #include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
 namespace broker {
@@ -34,6 +37,7 @@
 class Channel;
 class Connection;
 class Broker;
+class Queue;
 
 /**
  * Per-channel protocol adapter.
@@ -44,7 +48,7 @@
  * peer.
  * 
  */
-class SessionAdapter : public HandlerImpl, public 
framing::AMQP_ServerOperations
+ class SessionAdapter : public HandlerImpl, public 
framing::AMQP_ServerOperations
 {
   public:
     SessionAdapter(SemanticState& session);
@@ -116,12 +120,15 @@
                             shared_ptr<Exchange> alternate);
     };
 
-    class QueueHandlerImpl :
-        public Queue010Handler,
-        public HandlerHelper
+    class QueueHandlerImpl : public Queue010Handler,
+            public HandlerHelper, public OwnershipToken
     {
+        Broker& broker;
+        std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+
       public:
-        QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
+        QueueHandlerImpl(SemanticState& session);
+        ~QueueHandlerImpl();
         
         void declare(const std::string& queue,
                      const std::string& alternateExchange, 

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Mon Mar 17 05:17:55 
2008
@@ -33,5 +33,3 @@
 tests_0-10.testlib.TestBaseTest.testAssertEmptyFail
 tests_0-10.testlib.TestBaseTest.testAssertEmptyPass
 tests_0-10.testlib.TestBaseTest.testMessageProperties
-tests_0-10.queue.QueueTests.test_autodelete_shared
-tests_0-10.queue.QueueTests.test_declare_exclusive

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py Mon Mar 
17 05:17:55 2008
@@ -111,14 +111,13 @@
         session = self.session
         session.exchange_declare(exchange="alternate", type="fanout")
 
-        session = self.conn.session("alternate", 2)
-        session.queue_declare(queue="q", exclusive=True, auto_delete=True, 
alternate_exchange="alternate")
+        session2 = self.conn.session("alternate", 2)
+        session2.queue_declare(queue="q", exclusive=True, auto_delete=True, 
alternate_exchange="alternate")
         try:
-            session.exchange_delete(exchange="alternate")
+            session2.exchange_delete(exchange="alternate")
             self.fail("Expected deletion of in-use alternate-exchange to fail")
         except SessionException, e:
             session = self.session
-            session.queue_delete(queue="q")
             session.exchange_delete(exchange="alternate")
             self.assertEquals(530, e.args[0].error_code)            
 

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Mon Mar 17 05:17:55 
2008
@@ -86,19 +86,17 @@
         Test that the exclusive field is honoured in queue.declare
         """
         # TestBase.setUp has already opened session(1)
-        c1 = self.session
+        s1 = self.session
         # Here we open a second separate connection:
-        other = self.connect()
-        c2 = other.session(1)
-        c2.session_open()
+        s2 = self.conn.session("other", 2)
 
         #declare an exclusive queue:
-        c1.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
+        s1.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
         try:
             #other connection should not be allowed to declare this:
-            c2.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
+            s2.queue_declare(queue="exclusive-queue", exclusive=True, 
auto_delete=True)
             self.fail("Expected second exclusive queue_declare to raise a 
channel exception")
-        except Closed, e:
+        except SessionException, e:
             self.assertEquals(405, e.args[0].error_code)
 
 
@@ -322,24 +320,23 @@
         Test auto-deletion (of non-exclusive queues)
         """
         session = self.session
-        other = self.connect()
-        session2 = other.session(1)
-        session2.session_open()
+        session2 =self.conn.session("other", 1)
 
         session.queue_declare(queue="auto-delete-me", auto_delete=True)
 
         #consume from both sessions
-        reply = session.basic_consume(queue="auto-delete-me")
-        session2.basic_consume(queue="auto-delete-me")
+        tag = "my-tag"
+        session.message_subscribe(queue="auto-delete-me", destination=tag)
+        session2.message_subscribe(queue="auto-delete-me", destination=tag)
 
         #implicit cancel
-        session2.session_close()
+        session2.close()
 
         #check it is still there
         session.queue_declare(queue="auto-delete-me", passive=True)
 
         #explicit cancel => queue is now unused again:
-        session.basic_cancel(consumer_tag=reply.consumer_tag)
+        session.message_cancel(destination=tag)
 
         #NOTE: this assumes there is no timeout in use
 

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/tx.py?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/tx.py Mon Mar 17 05:17:55 2008
@@ -30,23 +30,30 @@
         """
         Test that commited publishes are delivered and commited acks are not 
re-delivered
         """
+        session = self.session
+
+        #declare queues and create subscribers in the checking session
+        #to ensure that the queues are not auto-deleted too early:        
+        self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+        session.message_subscribe(queue="tx-commit-a", destination="qa")
+        session.message_subscribe(queue="tx-commit-b", destination="qb")
+        session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+        #use a separate session for actual work
         session2 = self.conn.session("worker", 2)
         self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", 
"tx-commit-c")
         session2.tx_commit()
         session2.close()
 
-        #use a different session with new subscriptions to ensure
-        #there is no redelivery of acked messages:
-        session = self.session
         session.tx_select()
 
-        self.subscribe(session, queue="tx-commit-a", destination="qa")
+        self.enable_flow("qa")
         queue_a = session.incoming("qa")
 
-        self.subscribe(session, queue="tx-commit-b", destination="qb")
+        self.enable_flow("qb")
         queue_b = session.incoming("qb")
 
-        self.subscribe(session, queue="tx-commit-c", destination="qc")
+        self.enable_flow("qc")
         queue_c = session.incoming("qc")
 
         #check results
@@ -76,6 +83,12 @@
         """
         Test that a session closed with an open transaction is effectively 
rolled back
         """
+        session = self.session
+        self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", 
"tx-autorollback-c"])
+        session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+        session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+        session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
         session2 = self.conn.session("worker", 2)
         queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, 
"tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
 
@@ -87,16 +100,15 @@
 
         session2.close()
 
-        session = self.session
         session.tx_select()
 
-        self.subscribe(session, queue="tx-autorollback-a", destination="qa")
+        self.enable_flow("qa")
         queue_a = session.incoming("qa")
 
-        self.subscribe(session, queue="tx-autorollback-b", destination="qb")
+        self.enable_flow("qb")
         queue_b = session.incoming("qb")
 
-        self.subscribe(session, queue="tx-autorollback-c", destination="qc")
+        self.enable_flow("qc")
         queue_c = session.incoming("qc")
 
         #check results
@@ -169,9 +181,7 @@
         commit and rollback
         """
         #setup:
-        session.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
-        session.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
-        session.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+        self.declare_queues([name_a, name_b, name_c])
 
         key = "my_key_" + name_b
         topic = "my_topic_" + name_c 
@@ -232,12 +242,22 @@
         session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
         return queue_a, queue_b, queue_c, acked
 
+    def declare_queues(self, names, session=None):
+        session = session or self.session
+        for n in names:
+            session.queue_declare(queue=n, auto_delete=True)
+
     def subscribe(self, session=None, **keys):
         session = session or self.session
         consumer_tag = keys["destination"]
         session.message_subscribe(**keys)
         session.message_flow(destination=consumer_tag, unit=0, 
value=0xFFFFFFFF)
         session.message_flow(destination=consumer_tag, unit=1, 
value=0xFFFFFFFF)
+
+    def enable_flow(self, tag, session=None):
+        session = session or self.session
+        session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF)
+        session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF)
 
     def complete(self, session, msg):
         session.receiver._completed.add(msg.id)#TODO: this may be done 
automatically


Reply via email to