Author: gsim
Date: Tue Apr 22 05:05:52 2008
New Revision: 650450

URL: http://svn.apache.org/viewvc?rev=650450&view=rev
Log:
QPID-944: do no-local checking where requested when there is an exclusive 
subscription active


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py
    incubator/qpid/trunk/qpid/python/tests_0-10/queue.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Tue Apr 22 
05:05:52 2008
@@ -27,6 +27,7 @@
 }}
 
 #include "Message.h"
+#include "OwnershipToken.h"
 
 namespace qpid {
     namespace broker {
@@ -56,6 +57,7 @@
             virtual void notify() = 0;
             virtual bool filter(boost::intrusive_ptr<Message>) { return true; }
             virtual bool accept(boost::intrusive_ptr<Message>) { return true; }
+            virtual OwnershipToken* getSession() = 0;
             virtual ~Consumer(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Apr 22 05:05:52 
2008
@@ -58,7 +58,7 @@
     store(_store),
     owner(_owner), 
     consumerCount(0),
-    exclusive(false),
+    exclusive(0),
     noLocal(false),
     persistenceId(0)
 {
@@ -91,9 +91,18 @@
     notify();
 }
 
+bool isLocalTo(const OwnershipToken* token, boost::intrusive_ptr<Message>& msg)
+{
+    return token && token->isLocal(msg->getPublisher());
+}
+
 bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
 {
-    return noLocal && owner && owner->isLocal(msg->getPublisher());
+    //message is considered local if it was published on the same
+    //connection as that of the session which declared this queue
+    //exclusive (owner) or which has an exclusive subscription
+    //(exclusive)
+    return noLocal && (isLocalTo(owner, msg) || isLocalTo(exclusive, msg));
 }
 
 void Queue::deliver(boost::intrusive_ptr<Message>& msg){
@@ -328,7 +337,7 @@
     return false;
 }
 
-void Queue::consume(Consumer&, bool requestExclusive){
+void Queue::consume(Consumer& c, bool requestExclusive){
     Mutex::ScopedLock locker(consumerLock);
     if(exclusive) {
         throw AccessRefusedException(
@@ -338,7 +347,7 @@
             throw AccessRefusedException(
                 QPID_MSG("Queue " << getName() << " already has consumers. 
Exclusive access denied."));
         } else {
-            exclusive = true;
+            exclusive = c.getSession();
         }
     }
     consumerCount++;
@@ -352,7 +361,7 @@
     removeListener(c);
     Mutex::ScopedLock locker(consumerLock);
     consumerCount--;
-    if(exclusive) exclusive = false;
+    if(exclusive) exclusive = 0;
     if (mgmtObject.get() != 0){
         mgmtObject->dec_consumers ();
     }
@@ -485,10 +494,9 @@
     if (_policy->getMaxCount() || _policy->getMaxSize()) {
         setPolicy(_policy);
     }
-    if (owner) {
-        noLocal = _settings.get(qpidNoLocal);
-        QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
-    }
+    //set this regardless of owner to allow use of no-local with exclusive 
consumers also
+    noLocal = _settings.get(qpidNoLocal);
+    QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
     if (mgmtObject.get() != 0)
         mgmtObject->set_arguments (_settings);
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Apr 22 05:05:52 
2008
@@ -70,7 +70,7 @@
             MessageStore* store;
             const OwnershipToken* owner;
             uint32_t consumerCount;
-            bool exclusive;
+            OwnershipToken* exclusive;
             bool noLocal;
             Listeners listeners;
             Messages messages;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Apr 22 
05:05:52 2008
@@ -267,6 +267,11 @@
     msgCredit(0), 
     byteCredit(0) {}
 
+OwnershipToken* SemanticState::ConsumerImpl::getSession()
+{
+    return &(parent->session);
+}
+
 bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
 {
     allocateCredit(msg.payload);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Apr 22 
05:05:52 2008
@@ -80,6 +80,7 @@
                      const string& name, Queue::shared_ptr queue,
                      bool ack, bool nolocal, bool acquire);
         ~ConsumerImpl();
+        OwnershipToken* getSession();
         bool deliver(QueuedMessage& msg);            
         bool filter(boost::intrusive_ptr<Message> msg);            
         bool accept(boost::intrusive_ptr<Message> msg);            
@@ -93,7 +94,7 @@
         void stop();
         void complete(DeliveryRecord&);    
         Queue::shared_ptr getQueue() { return queue; }
-        bool isBlocked() const { return blocked; }
+        bool isBlocked() const { return blocked; }        
 
         bool doOutput();
     };

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Tue Apr 22 
05:05:52 2008
@@ -27,6 +27,7 @@
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/OutputControl.h"
 #include "ConnectionState.h"
+#include "OwnershipToken.h"
 
 
 #include <boost/noncopyable.hpp>
@@ -34,7 +35,7 @@
 namespace qpid {
 namespace broker {
 
-class SessionContext : public sys::OutputControl
+class SessionContext : public OwnershipToken, public sys::OutputControl
 {
   public:
     virtual ~SessionContext(){}

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/QueueTest.cpp Tue Apr 22 05:05:52 
2008
@@ -49,6 +49,7 @@
         return true;
     };
     void notify() {}
+    OwnershipToken* getSession() { return 0; }
 };
 
 class FailOnDeliver : public Deliverable

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Tue Apr 22 05:05:52 
2008
@@ -92,6 +92,34 @@
         #check queue is empty
         self.assertEqual(0, 
session.queue_query(queue="test-queue").message_count)
 
+    def test_no_local_exclusive_subscribe(self):
+        """
+        Test that the no_local flag is honoured in the consume method
+        """
+        session = self.session
+
+        #setup, declare two queues one of which excludes delivery of
+        #locally sent messages but is not declared as exclusive
+        session.queue_declare(queue="test-queue-1a", exclusive=True, 
auto_delete=True)
+        session.queue_declare(queue="test-queue-1b", auto_delete=True, 
arguments={'no-local':'true'})
+        #establish two consumers 
+        self.subscribe(destination="local_included", queue="test-queue-1a")
+        self.subscribe(destination="local_excluded", queue="test-queue-1b", 
exclusive=True)
+
+        #send a message
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"),
 "deliver-me"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"),
 "dont-deliver-me"))
+
+        #check the queues of the two consumers
+        excluded = session.incoming("local_excluded")
+        included = session.incoming("local_included")
+        msg = included.get(timeout=1)
+        self.assertEqual("deliver-me", msg.body)
+        try:
+            excluded.get(timeout=1)
+            self.fail("Received locally published message though 
no_local=true")
+        except Empty: None
+
 
     def test_consume_exclusive(self):
         """

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=650450&r1=650449&r2=650450&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Tue Apr 22 05:05:52 
2008
@@ -223,7 +223,7 @@
         
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
 "b"))
         
session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"),
 "c"))
         session.queue_delete(queue="delete-me")
-        #check that it has gone be declaring passively
+        #check that it has gone by declaring passively
         try:
             session.queue_declare(queue="delete-me", passive=True)
             self.fail("Queue has not been deleted")


Reply via email to