Author: gsim
Date: Tue Apr 22 05:05:52 2008
New Revision: 650450
URL: http://svn.apache.org/viewvc?rev=650450&view=rev
Log:
QPID-944: do no-local checking where requested when there is an exclusive
subscription active
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Tue Apr 22
05:05:52 2008
@@ -27,6 +27,7 @@
}}
#include "Message.h"
+#include "OwnershipToken.h"
namespace qpid {
namespace broker {
@@ -56,6 +57,7 @@
virtual void notify() = 0;
virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+ virtual OwnershipToken* getSession() = 0;
virtual ~Consumer(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Apr 22 05:05:52
2008
@@ -58,7 +58,7 @@
store(_store),
owner(_owner),
consumerCount(0),
- exclusive(false),
+ exclusive(0),
noLocal(false),
persistenceId(0)
{
@@ -91,9 +91,18 @@
notify();
}
+bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+{
+ return token && token->isLocal(msg->getPublisher());
+}
+
bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
{
- return noLocal && owner && owner->isLocal(msg->getPublisher());
+ //message is considered local if it was published on the same
+ //connection as that of the session which declared this queue
+ //exclusive (owner) or which has an exclusive subscription
+ //(exclusive)
+ return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
@@ -328,7 +337,7 @@
return false;
}
-void Queue::consume(Consumer&, bool requestExclusive){
+void Queue::consume(Consumer& c, bool requestExclusive){
Mutex::ScopedLock locker(consumerLock);
if(exclusive) {
throw AccessRefusedException(
@@ -338,7 +347,7 @@
throw AccessRefusedException(
QPID_MSG("Queue " << getName() << " already has consumers.
Exclusive access denied."));
} else {
- exclusive = true;
+ exclusive = c.getSession();
}
}
consumerCount++;
@@ -352,7 +361,7 @@
removeListener(c);
Mutex::ScopedLock locker(consumerLock);
consumerCount--;
- if(exclusive) exclusive = false;
+ if(exclusive) exclusive = 0;
if (mgmtObject.get() != 0){
mgmtObject->dec_consumers ();
}
@@ -485,10 +494,9 @@
if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
}
- if (owner) {
- noLocal = _settings.get(qpidNoLocal);
- QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
- }
+ //set this regardless of owner to allow use of no-local with exclusive
consumers also
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
if (mgmtObject.get() != 0)
mgmtObject->set_arguments (_settings);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Apr 22 05:05:52
2008
@@ -70,7 +70,7 @@
MessageStore* store;
const OwnershipToken* owner;
uint32_t consumerCount;
- bool exclusive;
+ OwnershipToken* exclusive;
bool noLocal;
Listeners listeners;
Messages messages;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Apr 22
05:05:52 2008
@@ -267,6 +267,11 @@
msgCredit(0),
byteCredit(0) {}
+OwnershipToken* SemanticState::ConsumerImpl::getSession()
+{
+ return &(parent->session);
+}
+
bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
{
allocateCredit(msg.payload);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Apr 22
05:05:52 2008
@@ -80,6 +80,7 @@
const string& name, Queue::shared_ptr queue,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
+ OwnershipToken* getSession();
bool deliver(QueuedMessage& msg);
bool filter(boost::intrusive_ptr<Message> msg);
bool accept(boost::intrusive_ptr<Message> msg);
@@ -93,7 +94,7 @@
void stop();
void complete(DeliveryRecord&);
Queue::shared_ptr getQueue() { return queue; }
- bool isBlocked() const { return blocked; }
+ bool isBlocked() const { return blocked; }
bool doOutput();
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Tue Apr 22
05:05:52 2008
@@ -27,6 +27,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/sys/OutputControl.h"
#include "ConnectionState.h"
+#include "OwnershipToken.h"
#include <boost/noncopyable.hpp>
@@ -34,7 +35,7 @@
namespace qpid {
namespace broker {
-class SessionContext : public sys::OutputControl
+class SessionContext : public OwnershipToken, public sys::OutputControl
{
public:
virtual ~SessionContext(){}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Apr 22 05:05:52
2008
@@ -49,6 +49,7 @@
return true;
};
void notify() {}
+ OwnershipToken* getSession() { return 0; }
};
class FailOnDeliver : public Deliverable
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Tue Apr 22 05:05:52
2008
@@ -92,6 +92,34 @@
#check queue is empty
self.assertEqual(0,
session.queue_query(queue="test-queue").message_count)
+ def test_no_local_exclusive_subscribe(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ session = self.session
+
+ #setup, declare two queues one of which excludes delivery of
+ #locally sent messages but is not declared as exclusive
+ session.queue_declare(queue="test-queue-1a", exclusive=True,
auto_delete=True)
+ session.queue_declare(queue="test-queue-1b", auto_delete=True,
arguments={'no-local':'true'})
+ #establish two consumers
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b",
exclusive=True)
+
+ #send a message
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"),
"deliver-me"))
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"),
"dont-deliver-me"))
+
+ #check the queues of the two consumers
+ excluded = session.incoming("local_excluded")
+ included = session.incoming("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("deliver-me", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though
no_local=true")
+ except Empty: None
+
def test_consume_exclusive(self):
"""
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Tue Apr 22 05:05:52
2008
@@ -223,7 +223,7 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
"b"))
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
"c"))
session.queue_delete(queue="delete-me")
- #check that it has gone be declaring passively
+ #check that it has gone by declaring passively
try:
session.queue_declare(queue="delete-me", passive=True)
self.fail("Queue has not been deleted")