Author: gsim
Date: Mon Mar 31 05:51:36 2008
New Revision: 642981

URL: http://svn.apache.org/viewvc?rev=642981&view=rev
Log:
Re-introduced old 'no-local' behaviour for exclusive queues via a proprietary 
arg to queue.declare.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
    incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
    incubator/qpid/trunk/qpid/python/tests_0-10/message.py

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h Mon Mar 31 
05:51:36 2008
@@ -30,6 +30,7 @@
          */
         class ConnectionToken : public OwnershipToken {
         public:
+            virtual bool isLocal(const ConnectionToken* t) const { return this 
== t; }
             virtual ~ConnectionToken(){}
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h Mon Mar 31 
05:51:36 2008
@@ -24,8 +24,11 @@
 namespace qpid {
 namespace broker {
 
+class ConnectionToken;
+
 class OwnershipToken{
 public:
+    virtual bool isLocal(const ConnectionToken* t) const = 0;
     virtual ~OwnershipToken(){}
 };
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp Mon 
Mar 31 05:51:36 2008
@@ -90,6 +90,11 @@
     return getHandler()->getConnection();
 }
 
+bool PreviewSessionState::isLocal(const ConnectionToken* t) const
+{
+    return isAttached() && &(handler->getConnection()) == t;
+}
+
 void PreviewSessionState::detach() {
     
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
     Mutex::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h Mon Mar 
31 05:51:36 2008
@@ -64,7 +64,7 @@
 {
   public:
     ~PreviewSessionState();
-    bool isAttached() { return handler; }
+    bool isAttached() const { return handler; }
 
     void detach();
     void attach(PreviewSessionHandler& handler);
@@ -77,6 +77,7 @@
     
     /** @pre isAttached() */
     ConnectionState& getConnection();
+    bool isLocal(const ConnectionToken* t) const;
 
     uint32_t getTimeout() const { return timeout; }
     Broker& getBroker() { return broker; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar 31 05:51:36 
2008
@@ -59,6 +59,7 @@
     owner(_owner), 
     consumerCount(0),
     exclusive(false),
+    noLocal(false),
     persistenceId(0)
 {
     if (parent != 0)
@@ -90,6 +91,10 @@
     notify();
 }
 
+bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+{
+    return noLocal && owner && owner->isLocal(msg->getPublisher());
+}
 
 void Queue::deliver(boost::intrusive_ptr<Message>& msg){
     if (msg->isImmediate() && getConsumerCount() == 0) {
@@ -97,9 +102,10 @@
             DeliverableMessage deliverable(msg);
             alternateExchange->route(deliverable, msg->getRoutingKey(), 
msg->getApplicationHeaders());
         }
+    } else if (isLocal(msg)) {
+        //drop message
+        QPID_LOG(debug, "Dropping 'local' message from " << getName());
     } else {
-
-
         // if no store then mark as enqueued
         if (!enqueue(0, msg)){
             push(msg);
@@ -468,6 +474,7 @@
 {
     const std::string qpidMaxSize("qpid.max_size");
     const std::string qpidMaxCount("qpid.max_count");
+    const std::string qpidNoLocal("no-local");
 }
 
 void Queue::create(const FieldTable& _settings)
@@ -484,8 +491,13 @@
 void Queue::configure(const FieldTable& _settings)
 {
     std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
-    if (_policy->getMaxCount() || _policy->getMaxSize()) 
+    if (_policy->getMaxCount() || _policy->getMaxSize()) {
         setPolicy(_policy);
+    }    
+    if (owner) {
+        noLocal = _settings.get(qpidNoLocal);
+        QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+    }
 }
 
 void Queue::destroy()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Mar 31 05:51:36 
2008
@@ -70,6 +70,7 @@
             const OwnershipToken* owner;
             uint32_t consumerCount;
             bool exclusive;
+            bool noLocal;
             Listeners listeners;
             Messages messages;
             mutable qpid::sys::Mutex consumerLock;
@@ -118,6 +119,7 @@
 
             bool acquire(const QueuedMessage& msg);
 
+            bool isLocal(boost::intrusive_ptr<Message>& msg);
             /**
              * Delivers a message to the queue. Will record it as
              * enqueued if persistent then process it.

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Mar 31 
05:51:36 2008
@@ -23,6 +23,7 @@
 #include "qpid/Exception.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/constants.h"
+#include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <boost/cast.hpp>
 #include <boost/bind.hpp>
@@ -198,6 +199,12 @@
         exclusiveQueues.erase(exclusiveQueues.begin());
     }
 }
+    
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const 
+{ 
+    return session.isLocal(t); 
+}
+
 
 Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
 {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Mon Mar 31 
05:51:36 2008
@@ -21,11 +21,12 @@
 
 #include "HandlerImpl.h"
 
+#include "ConnectionToken.h"
+#include "OwnershipToken.h"
 #include "qpid/Exception.h"
 #include "qpid/framing/AMQP_ServerOperations.h"
 #include "qpid/framing/reply_exceptions.h"
 #include "qpid/framing/SequenceSet.h"
-#include "OwnershipToken.h"
 
 #include <vector>
 #include <boost/function.hpp>
@@ -140,6 +141,7 @@
                      bool ifUnused, bool ifEmpty);
         void purge(const std::string& queue); 
         framing::Queue010QueryResult query(const std::string& queue);
+        bool isLocal(const ConnectionToken* t) const; 
     };
 
     class MessageHandlerImpl :

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Mon Mar 31 
05:51:36 2008
@@ -38,6 +38,7 @@
 {
   public:
     virtual ~SessionContext(){}
+    virtual bool isLocal(const ConnectionToken* t) const = 0;
     virtual ConnectionState& getConnection() = 0;
     virtual framing::AMQP_ClientProxy& getProxy() = 0;
     virtual Broker& getBroker() = 0;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Mar 31 
05:51:36 2008
@@ -96,6 +96,11 @@
     return getHandler()->getConnection();
 }
 
+bool SessionState::isLocal(const ConnectionToken* t) const
+{
+    return isAttached() && &(handler->getConnection()) == t;
+}
+
 void SessionState::detach() {
     getConnection().outputTasks.removeOutputTask(&semanticState);
     Mutex::ScopedLock l(lock);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Mar 31 
05:51:36 2008
@@ -70,7 +70,7 @@
 {
   public:
     ~SessionState();
-    bool isAttached() { return handler; }
+    bool isAttached() const { return handler; }
 
     void detach();
     void attach(SessionHandler& handler);
@@ -83,6 +83,7 @@
     
     /** @pre isAttached() */
     ConnectionState& getConnection();
+    bool isLocal(const ConnectionToken* t) const;
 
     uint32_t getTimeout() const { return timeout; }
     void setTimeout(uint32_t t) { timeout = t; }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Mon Mar 31 
05:51:36 2008
@@ -44,8 +44,12 @@
 }
 
 void TxPublish::deliverTo(Queue::shared_ptr& queue){
-    queues.push_back(queue);
-    delivered = true;
+    if (!queue->isLocal(msg)) {
+        queues.push_back(queue);
+        delivered = true;
+    } else {
+        QPID_LOG(debug, "Won't enqueue local message for " << 
queue->getName());
+    }
 }
 
 TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& 
_msg) 

Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Mon Mar 31 05:51:36 
2008
@@ -3,8 +3,6 @@
 tests.codec.FieldTableTestCase.test_field_table_name_value_pair
 tests_0-10.execution.ExecutionTests.test_flush
 tests_0-10.dtx.DtxTests.test_recover
-tests_0-10.message.MessageTests.test_consume_no_local
-tests_0-10.message.MessageTests.test_consume_no_local_awkward
 tests_0-10.message.MessageTests.test_no_size
 tests_0-10.message.MessageTests.test_qos_prefetch_count
 tests_0-10.message.MessageTests.test_qos_prefetch_size

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Mon Mar 31 05:51:36 2008
@@ -361,3 +361,10 @@
     def tearDown(self):
         if not self.session.error(): self.session.close(timeout=10)
         self.conn.close(timeout=10)
+
+    def subscribe(self, session=None, **keys):
+        session = session or self.session
+        consumer_tag = keys["destination"]
+        session.message_subscribe(**keys)
+        session.message_flow(destination=consumer_tag, unit=0, 
value=0xFFFFFFFF)
+        session.message_flow(destination=consumer_tag, unit=1, 
value=0xFFFFFFFF)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py Mon Mar 31 05:51:36 2008
@@ -667,10 +667,3 @@
         dp=session.delivery_properties(routing_key=key)
         mp=session.message_properties(correlation_id=id)
         session.message_transfer(message=Message(dp, mp, body))
-
-    def subscribe(self, session=None, **keys):
-        session = session or self.session
-        consumer_tag = keys["destination"]
-        session.message_subscribe(**keys)
-        session.message_flow(destination=consumer_tag, unit=0, 
value=0xFFFFFFFF)
-        session.message_flow(destination=consumer_tag, unit=1, 
value=0xFFFFFFFF)

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Mon Mar 31 05:51:36 
2008
@@ -28,33 +28,33 @@
 class MessageTests(TestBase010):
     """Tests for 'methods' on the amqp message 'class'"""
 
-    def test_consume_no_local(self):
+    def test_no_local(self):
         """
         Test that the no_local flag is honoured in the consume method
         """
         session = self.session
-        #setup, declare two queues:
+        #setup, declare two queues one of which excludes delivery of locally 
sent messages
         session.queue_declare(queue="test-queue-1a", exclusive=True, 
auto_delete=True)
-        session.queue_declare(queue="test-queue-1b", exclusive=True, 
auto_delete=True)
-        #establish two consumers one of which excludes delivery of locally 
sent messages
+        session.queue_declare(queue="test-queue-1b", exclusive=True, 
auto_delete=True, arguments={'no-local':'true'})
+        #establish two consumers 
         self.subscribe(destination="local_included", queue="test-queue-1a")
-        self.subscribe(destination="local_excluded", queue="test-queue-1b", 
no_local=True)
+        self.subscribe(destination="local_excluded", queue="test-queue-1b")
 
         #send a message
-        session.message_transfer(content=Content(properties={'routing_key' : 
"test-queue-1a"}, body="consume_no_local"))
-        session.message_transfer(content=Content(properties={'routing_key' : 
"test-queue-1b"}, body="consume_no_local"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"),
 "deliver-me"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"),
 "dont-deliver-me"))
 
         #check the queues of the two consumers
         excluded = session.incoming("local_excluded")
         included = session.incoming("local_included")
         msg = included.get(timeout=1)
-        self.assertEqual("consume_no_local", msg.body)
+        self.assertEqual("deliver-me", msg.body)
         try:
             excluded.get(timeout=1)
             self.fail("Received locally published message though 
no_local=true")
         except Empty: None
 
-    def test_consume_no_local_awkward(self):
+    def test_no_local_awkward(self):
 
         """
         If an exclusive queue gets a no-local delivered to it, that
@@ -67,19 +67,18 @@
 
         session = self.session
         #setup:
-        session.queue_declare(queue="test-queue", exclusive=True, 
auto_delete=True)
+        session.queue_declare(queue="test-queue", exclusive=True, 
auto_delete=True, arguments={'no-local':'true'})
         #establish consumer which excludes delivery of locally sent messages
-        self.subscribe(destination="local_excluded", queue="test-queue", 
no_local=True)
+        self.subscribe(destination="local_excluded", queue="test-queue")
 
         #send a 'local' message
-        session.message_transfer(content=Content(properties={'routing_key' : 
"test-queue"}, body="local"))
+        
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"),
 "local"))
 
         #send a non local message
         other = self.connect()
-        session2 = other.session(1)
-        session2.session_open()
-        session2.message_transfer(content=Content(properties={'routing_key' : 
"test-queue"}, body="foreign"))
-        session2.session_close()
+        session2 = other.session("my-session", 1)
+        
session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"),
 "foreign"))
+        session2.close()
         other.close()
 
         #check that the second message only is delivered


Reply via email to