Author: gsim
Date: Mon Mar 17 05:17:55 2008
New Revision: 637854
URL: http://svn.apache.org/viewvc?rev=637854&view=rev
Log:
Scope exclusive queues to sessions.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h Mon Mar 17
05:17:55 2008
@@ -21,13 +21,14 @@
#ifndef _ConnectionToken_
#define _ConnectionToken_
+#include "OwnershipToken.h"
namespace qpid {
namespace broker {
/**
* An empty interface allowing opaque implementations of some
* form of token to identify a connection.
*/
- class ConnectionToken{
+ class ConnectionToken : public OwnershipToken {
public:
virtual ~ConnectionToken(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar 17 05:17:55
2008
@@ -46,7 +46,7 @@
Queue::Queue(const string& _name, bool _autodelete,
MessageStore* const _store,
- const ConnectionToken* const _owner,
+ const OwnershipToken* const _owner,
Manageable* parent) :
name(_name),
@@ -582,7 +582,7 @@
}
-bool Queue::isExclusiveOwner(const ConnectionToken* const o) const
+bool Queue::isExclusiveOwner(const OwnershipToken* const o) const
{
Mutex::ScopedLock locker(ownershipLock);
return o == owner;
@@ -594,7 +594,7 @@
owner = 0;
}
-bool Queue::setExclusiveOwner(const ConnectionToken* const o)
+bool Queue::setExclusiveOwner(const OwnershipToken* const o)
{
Mutex::ScopedLock locker(ownershipLock);
if (owner) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Mar 17 05:17:55
2008
@@ -28,7 +28,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include "qpid/framing/amqp_types.h"
-#include "ConnectionToken.h"
+#include "OwnershipToken.h"
#include "Consumer.h"
#include "Message.h"
#include "qpid/framing/FieldTable.h"
@@ -63,7 +63,7 @@
const string name;
const bool autodelete;
MessageStore* store;
- const ConnectionToken* owner;
+ const OwnershipToken* owner;
uint32_t consumerCount;
bool exclusive;
Listeners listeners;
@@ -100,7 +100,7 @@
Queue(const string& name, bool autodelete = false,
MessageStore* const store = 0,
- const ConnectionToken* const owner = 0,
+ const OwnershipToken* const owner = 0,
Manageable* parent = 0);
~Queue();
@@ -143,9 +143,9 @@
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
- bool isExclusiveOwner(const ConnectionToken* const o) const;
+ bool isExclusiveOwner(const OwnershipToken* const o) const;
void releaseExclusiveOwnership();
- bool setExclusiveOwner(const ConnectionToken* const o);
+ bool setExclusiveOwner(const OwnershipToken* const o);
bool hasExclusiveConsumer() const;
bool hasExclusiveOwner() const;
inline bool isDurable() const { return store != 0; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.cpp Mon Mar 17
05:17:55 2008
@@ -33,7 +33,7 @@
std::pair<Queue::shared_ptr, bool>
QueueRegistry::declare(const string& declareName, bool durable,
- bool autoDelete, const ConnectionToken* owner)
+ bool autoDelete, const OwnershipToken* owner)
{
RWlock::ScopedWlock locker(lock);
string name = declareName.empty() ? generateName() : declareName;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/QueueRegistry.h Mon Mar 17
05:17:55 2008
@@ -48,7 +48,7 @@
* was created by this declare call false if it already existed.
*/
std::pair<Queue::shared_ptr, bool> declare(const string& name, bool
durable = false, bool autodelete = false,
- const ConnectionToken* const
owner = 0);
+ const OwnershipToken* const
owner = 0);
/**
* Destroy the named queue.
@@ -62,7 +62,6 @@
* subsequent calls to find or declare with the same name.
*
*/
- void destroyLH (const string& name);
void destroy (const string& name);
template <class Test> bool destroyIf(const string& name, Test test)
{
@@ -107,6 +106,9 @@
int counter;
MessageStore* store;
management::Manageable* parent;
+
+ //destroy impl that assumes lock is already held:
+ void destroyLH (const string& name);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Mar 17
05:17:55 2008
@@ -19,6 +19,7 @@
#include "Connection.h"
#include "DeliveryToken.h"
#include "MessageDelivery.h"
+#include "Queue.h"
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include <boost/format.hpp>
@@ -180,6 +181,22 @@
}
}
+SessionAdapter::QueueHandlerImpl::QueueHandlerImpl(SemanticState& session) :
HandlerHelper(session), broker(getBroker())
+{}
+
+
+SessionAdapter::QueueHandlerImpl::~QueueHandlerImpl()
+{
+ while (!exclusiveQueues.empty()) {
+ Queue::shared_ptr q(exclusiveQueues.front());
+ q->releaseExclusiveOwnership();
+ if (q->canAutoDelete()) {
+ Queue::tryAutoDelete(broker, q);
+ }
+ exclusiveQueues.erase(exclusiveQueues.begin());
+ }
+}
+
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
Queue::shared_ptr queue = getQueue(name);
@@ -212,7 +229,7 @@
getBroker().getQueues().declare(
name, durable,
autoDelete,
- exclusive ? &getConnection() : 0);
+ exclusive ? this : 0);
queue = queue_created.first;
assert(queue);
if (queue_created.second) { // This is a new queue
@@ -230,15 +247,15 @@
//handle automatic cleanup:
if (exclusive) {
- getConnection().exclusiveQueues.push_back(queue);
+ exclusiveQueues.push_back(queue);
}
} else {
- if (exclusive && queue->setExclusiveOwner(&getConnection())) {
- getConnection().exclusiveQueues.push_back(queue);
+ if (exclusive && queue->setExclusiveOwner(this)) {
+ exclusiveQueues.push_back(queue);
}
}
}
- if (exclusive && !queue->isExclusiveOwner(&getConnection()))
+ if (exclusive && !queue->isExclusiveOwner(this))
throw ResourceLockedException(
QPID_MSG("Cannot grant exclusive access to queue "
<< queue->getName()));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Mon Mar 17
05:17:55 2008
@@ -25,8 +25,11 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/SequenceSet.h"
+#include "OwnershipToken.h"
+#include <vector>
#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
@@ -34,6 +37,7 @@
class Channel;
class Connection;
class Broker;
+class Queue;
/**
* Per-channel protocol adapter.
@@ -44,7 +48,7 @@
* peer.
*
*/
-class SessionAdapter : public HandlerImpl, public
framing::AMQP_ServerOperations
+ class SessionAdapter : public HandlerImpl, public
framing::AMQP_ServerOperations
{
public:
SessionAdapter(SemanticState& session);
@@ -116,12 +120,15 @@
shared_ptr<Exchange> alternate);
};
- class QueueHandlerImpl :
- public Queue010Handler,
- public HandlerHelper
+ class QueueHandlerImpl : public Queue010Handler,
+ public HandlerHelper, public OwnershipToken
{
+ Broker& broker;
+ std::vector< boost::shared_ptr<Queue> > exclusiveQueues;
+
public:
- QueueHandlerImpl(SemanticState& session) : HandlerHelper(session) {}
+ QueueHandlerImpl(SemanticState& session);
+ ~QueueHandlerImpl();
void declare(const std::string& queue,
const std::string& alternateExchange,
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Mon Mar 17 05:17:55
2008
@@ -33,5 +33,3 @@
tests_0-10.testlib.TestBaseTest.testAssertEmptyFail
tests_0-10.testlib.TestBaseTest.testAssertEmptyPass
tests_0-10.testlib.TestBaseTest.testMessageProperties
-tests_0-10.queue.QueueTests.test_autodelete_shared
-tests_0-10.queue.QueueTests.test_declare_exclusive
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py Mon Mar
17 05:17:55 2008
@@ -111,14 +111,13 @@
session = self.session
session.exchange_declare(exchange="alternate", type="fanout")
- session = self.conn.session("alternate", 2)
- session.queue_declare(queue="q", exclusive=True, auto_delete=True,
alternate_exchange="alternate")
+ session2 = self.conn.session("alternate", 2)
+ session2.queue_declare(queue="q", exclusive=True, auto_delete=True,
alternate_exchange="alternate")
try:
- session.exchange_delete(exchange="alternate")
+ session2.exchange_delete(exchange="alternate")
self.fail("Expected deletion of in-use alternate-exchange to fail")
except SessionException, e:
session = self.session
- session.queue_delete(queue="q")
session.exchange_delete(exchange="alternate")
self.assertEquals(530, e.args[0].error_code)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Mon Mar 17 05:17:55
2008
@@ -86,19 +86,17 @@
Test that the exclusive field is honoured in queue.declare
"""
# TestBase.setUp has already opened session(1)
- c1 = self.session
+ s1 = self.session
# Here we open a second separate connection:
- other = self.connect()
- c2 = other.session(1)
- c2.session_open()
+ s2 = self.conn.session("other", 2)
#declare an exclusive queue:
- c1.queue_declare(queue="exclusive-queue", exclusive=True,
auto_delete=True)
+ s1.queue_declare(queue="exclusive-queue", exclusive=True,
auto_delete=True)
try:
#other connection should not be allowed to declare this:
- c2.queue_declare(queue="exclusive-queue", exclusive=True,
auto_delete=True)
+ s2.queue_declare(queue="exclusive-queue", exclusive=True,
auto_delete=True)
self.fail("Expected second exclusive queue_declare to raise a
channel exception")
- except Closed, e:
+ except SessionException, e:
self.assertEquals(405, e.args[0].error_code)
@@ -322,24 +320,23 @@
Test auto-deletion (of non-exclusive queues)
"""
session = self.session
- other = self.connect()
- session2 = other.session(1)
- session2.session_open()
+ session2 =self.conn.session("other", 1)
session.queue_declare(queue="auto-delete-me", auto_delete=True)
#consume from both sessions
- reply = session.basic_consume(queue="auto-delete-me")
- session2.basic_consume(queue="auto-delete-me")
+ tag = "my-tag"
+ session.message_subscribe(queue="auto-delete-me", destination=tag)
+ session2.message_subscribe(queue="auto-delete-me", destination=tag)
#implicit cancel
- session2.session_close()
+ session2.close()
#check it is still there
session.queue_declare(queue="auto-delete-me", passive=True)
#explicit cancel => queue is now unused again:
- session.basic_cancel(consumer_tag=reply.consumer_tag)
+ session.message_cancel(destination=tag)
#NOTE: this assumes there is no timeout in use
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/tx.py?rev=637854&r1=637853&r2=637854&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/tx.py Mon Mar 17 05:17:55 2008
@@ -30,23 +30,30 @@
"""
Test that commited publishes are delivered and commited acks are not
re-delivered
"""
+ session = self.session
+
+ #declare queues and create subscribers in the checking session
+ #to ensure that the queues are not auto-deleted too early:
+ self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+ session.message_subscribe(queue="tx-commit-a", destination="qa")
+ session.message_subscribe(queue="tx-commit-b", destination="qb")
+ session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+ #use a separate session for actual work
session2 = self.conn.session("worker", 2)
self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b",
"tx-commit-c")
session2.tx_commit()
session2.close()
- #use a different session with new subscriptions to ensure
- #there is no redelivery of acked messages:
- session = self.session
session.tx_select()
- self.subscribe(session, queue="tx-commit-a", destination="qa")
+ self.enable_flow("qa")
queue_a = session.incoming("qa")
- self.subscribe(session, queue="tx-commit-b", destination="qb")
+ self.enable_flow("qb")
queue_b = session.incoming("qb")
- self.subscribe(session, queue="tx-commit-c", destination="qc")
+ self.enable_flow("qc")
queue_c = session.incoming("qc")
#check results
@@ -76,6 +83,12 @@
"""
Test that a session closed with an open transaction is effectively
rolled back
"""
+ session = self.session
+ self.declare_queues(["tx-autorollback-a", "tx-autorollback-b",
"tx-autorollback-c"])
+ session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+ session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+ session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
session2 = self.conn.session("worker", 2)
queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2,
"tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
@@ -87,16 +100,15 @@
session2.close()
- session = self.session
session.tx_select()
- self.subscribe(session, queue="tx-autorollback-a", destination="qa")
+ self.enable_flow("qa")
queue_a = session.incoming("qa")
- self.subscribe(session, queue="tx-autorollback-b", destination="qb")
+ self.enable_flow("qb")
queue_b = session.incoming("qb")
- self.subscribe(session, queue="tx-autorollback-c", destination="qc")
+ self.enable_flow("qc")
queue_c = session.incoming("qc")
#check results
@@ -169,9 +181,7 @@
commit and rollback
"""
#setup:
- session.queue_declare(queue=name_a, exclusive=True, auto_delete=True)
- session.queue_declare(queue=name_b, exclusive=True, auto_delete=True)
- session.queue_declare(queue=name_c, exclusive=True, auto_delete=True)
+ self.declare_queues([name_a, name_b, name_c])
key = "my_key_" + name_b
topic = "my_topic_" + name_c
@@ -232,12 +242,22 @@
session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
return queue_a, queue_b, queue_c, acked
+ def declare_queues(self, names, session=None):
+ session = session or self.session
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+
def subscribe(self, session=None, **keys):
session = session or self.session
consumer_tag = keys["destination"]
session.message_subscribe(**keys)
session.message_flow(destination=consumer_tag, unit=0,
value=0xFFFFFFFF)
session.message_flow(destination=consumer_tag, unit=1,
value=0xFFFFFFFF)
+
+ def enable_flow(self, tag, session=None):
+ session = session or self.session
+ session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF)
+ session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF)
def complete(self, session, msg):
session.receiver._completed.add(msg.id)#TODO: this may be done
automatically