Author: gsim
Date: Mon Mar 31 05:51:36 2008
New Revision: 642981
URL: http://svn.apache.org/viewvc?rev=642981&view=rev
Log:
Re-introduced old 'no-local' behaviour for exclusive queues via a proprietary
arg to queue.declare.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
incubator/qpid/trunk/qpid/python/qpid/testlib.py
incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionToken.h Mon Mar 31
05:51:36 2008
@@ -30,6 +30,7 @@
*/
class ConnectionToken : public OwnershipToken {
public:
+ virtual bool isLocal(const ConnectionToken* t) const { return this
== t; }
virtual ~ConnectionToken(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/OwnershipToken.h Mon Mar 31
05:51:36 2008
@@ -24,8 +24,11 @@
namespace qpid {
namespace broker {
+class ConnectionToken;
+
class OwnershipToken{
public:
+ virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual ~OwnershipToken(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.cpp Mon
Mar 31 05:51:36 2008
@@ -90,6 +90,11 @@
return getHandler()->getConnection();
}
+bool PreviewSessionState::isLocal(const ConnectionToken* t) const
+{
+ return isAttached() && &(handler->getConnection()) == t;
+}
+
void PreviewSessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticHandler->getSemanticState());
Mutex::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/PreviewSessionState.h Mon Mar
31 05:51:36 2008
@@ -64,7 +64,7 @@
{
public:
~PreviewSessionState();
- bool isAttached() { return handler; }
+ bool isAttached() const { return handler; }
void detach();
void attach(PreviewSessionHandler& handler);
@@ -77,6 +77,7 @@
/** @pre isAttached() */
ConnectionState& getConnection();
+ bool isLocal(const ConnectionToken* t) const;
uint32_t getTimeout() const { return timeout; }
Broker& getBroker() { return broker; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Mon Mar 31 05:51:36
2008
@@ -59,6 +59,7 @@
owner(_owner),
consumerCount(0),
exclusive(false),
+ noLocal(false),
persistenceId(0)
{
if (parent != 0)
@@ -90,6 +91,10 @@
notify();
}
+bool Queue::isLocal(boost::intrusive_ptr<Message>& msg)
+{
+ return noLocal && owner && owner->isLocal(msg->getPublisher());
+}
void Queue::deliver(boost::intrusive_ptr<Message>& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
@@ -97,9 +102,10 @@
DeliverableMessage deliverable(msg);
alternateExchange->route(deliverable, msg->getRoutingKey(),
msg->getApplicationHeaders());
}
+ } else if (isLocal(msg)) {
+ //drop message
+ QPID_LOG(debug, "Dropping 'local' message from " << getName());
} else {
-
-
// if no store then mark as enqueued
if (!enqueue(0, msg)){
push(msg);
@@ -468,6 +474,7 @@
{
const std::string qpidMaxSize("qpid.max_size");
const std::string qpidMaxCount("qpid.max_count");
+ const std::string qpidNoLocal("no-local");
}
void Queue::create(const FieldTable& _settings)
@@ -484,8 +491,13 @@
void Queue::configure(const FieldTable& _settings)
{
std::auto_ptr<QueuePolicy> _policy(new QueuePolicy(_settings));
- if (_policy->getMaxCount() || _policy->getMaxSize())
+ if (_policy->getMaxCount() || _policy->getMaxSize()) {
setPolicy(_policy);
+ }
+ if (owner) {
+ noLocal = _settings.get(qpidNoLocal);
+ QPID_LOG(debug, "Configured queue with no-local=" << noLocal);
+ }
}
void Queue::destroy()
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Mon Mar 31 05:51:36
2008
@@ -70,6 +70,7 @@
const OwnershipToken* owner;
uint32_t consumerCount;
bool exclusive;
+ bool noLocal;
Listeners listeners;
Messages messages;
mutable qpid::sys::Mutex consumerLock;
@@ -118,6 +119,7 @@
bool acquire(const QueuedMessage& msg);
+ bool isLocal(boost::intrusive_ptr<Message>& msg);
/**
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.cpp Mon Mar 31
05:51:36 2008
@@ -23,6 +23,7 @@
#include "qpid/Exception.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/constants.h"
+#include "qpid/log/Statement.h"
#include <boost/format.hpp>
#include <boost/cast.hpp>
#include <boost/bind.hpp>
@@ -198,6 +199,12 @@
exclusiveQueues.erase(exclusiveQueues.begin());
}
}
+
+bool SessionAdapter::QueueHandlerImpl::isLocal(const ConnectionToken* t) const
+{
+ return session.isLocal(t);
+}
+
Queue010QueryResult SessionAdapter::QueueHandlerImpl::query(const string& name)
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionAdapter.h Mon Mar 31
05:51:36 2008
@@ -21,11 +21,12 @@
#include "HandlerImpl.h"
+#include "ConnectionToken.h"
+#include "OwnershipToken.h"
#include "qpid/Exception.h"
#include "qpid/framing/AMQP_ServerOperations.h"
#include "qpid/framing/reply_exceptions.h"
#include "qpid/framing/SequenceSet.h"
-#include "OwnershipToken.h"
#include <vector>
#include <boost/function.hpp>
@@ -140,6 +141,7 @@
bool ifUnused, bool ifEmpty);
void purge(const std::string& queue);
framing::Queue010QueryResult query(const std::string& queue);
+ bool isLocal(const ConnectionToken* t) const;
};
class MessageHandlerImpl :
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Mon Mar 31
05:51:36 2008
@@ -38,6 +38,7 @@
{
public:
virtual ~SessionContext(){}
+ virtual bool isLocal(const ConnectionToken* t) const = 0;
virtual ConnectionState& getConnection() = 0;
virtual framing::AMQP_ClientProxy& getProxy() = 0;
virtual Broker& getBroker() = 0;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Mon Mar 31
05:51:36 2008
@@ -96,6 +96,11 @@
return getHandler()->getConnection();
}
+bool SessionState::isLocal(const ConnectionToken* t) const
+{
+ return isAttached() && &(handler->getConnection()) == t;
+}
+
void SessionState::detach() {
getConnection().outputTasks.removeOutputTask(&semanticState);
Mutex::ScopedLock l(lock);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Mon Mar 31
05:51:36 2008
@@ -70,7 +70,7 @@
{
public:
~SessionState();
- bool isAttached() { return handler; }
+ bool isAttached() const { return handler; }
void detach();
void attach(SessionHandler& handler);
@@ -83,6 +83,7 @@
/** @pre isAttached() */
ConnectionState& getConnection();
+ bool isLocal(const ConnectionToken* t) const;
uint32_t getTimeout() const { return timeout; }
void setTimeout(uint32_t t) { timeout = t; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.cpp Mon Mar 31
05:51:36 2008
@@ -44,8 +44,12 @@
}
void TxPublish::deliverTo(Queue::shared_ptr& queue){
- queues.push_back(queue);
- delivered = true;
+ if (!queue->isLocal(msg)) {
+ queues.push_back(queue);
+ delivered = true;
+ } else {
+ QPID_LOG(debug, "Won't enqueue local message for " <<
queue->getName());
+ }
}
TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>&
_msg)
Modified: incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing_0-10.txt Mon Mar 31 05:51:36
2008
@@ -3,8 +3,6 @@
tests.codec.FieldTableTestCase.test_field_table_name_value_pair
tests_0-10.execution.ExecutionTests.test_flush
tests_0-10.dtx.DtxTests.test_recover
-tests_0-10.message.MessageTests.test_consume_no_local
-tests_0-10.message.MessageTests.test_consume_no_local_awkward
tests_0-10.message.MessageTests.test_no_size
tests_0-10.message.MessageTests.test_qos_prefetch_count
tests_0-10.message.MessageTests.test_qos_prefetch_size
Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Mon Mar 31 05:51:36 2008
@@ -361,3 +361,10 @@
def tearDown(self):
if not self.session.error(): self.session.close(timeout=10)
self.conn.close(timeout=10)
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=0,
value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag, unit=1,
value=0xFFFFFFFF)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py Mon Mar 31 05:51:36 2008
@@ -667,10 +667,3 @@
dp=session.delivery_properties(routing_key=key)
mp=session.message_properties(correlation_id=id)
session.message_transfer(message=Message(dp, mp, body))
-
- def subscribe(self, session=None, **keys):
- session = session or self.session
- consumer_tag = keys["destination"]
- session.message_subscribe(**keys)
- session.message_flow(destination=consumer_tag, unit=0,
value=0xFFFFFFFF)
- session.message_flow(destination=consumer_tag, unit=1,
value=0xFFFFFFFF)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=642981&r1=642980&r2=642981&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Mon Mar 31 05:51:36
2008
@@ -28,33 +28,33 @@
class MessageTests(TestBase010):
"""Tests for 'methods' on the amqp message 'class'"""
- def test_consume_no_local(self):
+ def test_no_local(self):
"""
Test that the no_local flag is honoured in the consume method
"""
session = self.session
- #setup, declare two queues:
+ #setup, declare two queues one of which excludes delivery of locally
sent messages
session.queue_declare(queue="test-queue-1a", exclusive=True,
auto_delete=True)
- session.queue_declare(queue="test-queue-1b", exclusive=True,
auto_delete=True)
- #establish two consumers one of which excludes delivery of locally
sent messages
+ session.queue_declare(queue="test-queue-1b", exclusive=True,
auto_delete=True, arguments={'no-local':'true'})
+ #establish two consumers
self.subscribe(destination="local_included", queue="test-queue-1a")
- self.subscribe(destination="local_excluded", queue="test-queue-1b",
no_local=True)
+ self.subscribe(destination="local_excluded", queue="test-queue-1b")
#send a message
- session.message_transfer(content=Content(properties={'routing_key' :
"test-queue-1a"}, body="consume_no_local"))
- session.message_transfer(content=Content(properties={'routing_key' :
"test-queue-1b"}, body="consume_no_local"))
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"),
"deliver-me"))
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"),
"dont-deliver-me"))
#check the queues of the two consumers
excluded = session.incoming("local_excluded")
included = session.incoming("local_included")
msg = included.get(timeout=1)
- self.assertEqual("consume_no_local", msg.body)
+ self.assertEqual("deliver-me", msg.body)
try:
excluded.get(timeout=1)
self.fail("Received locally published message though
no_local=true")
except Empty: None
- def test_consume_no_local_awkward(self):
+ def test_no_local_awkward(self):
"""
If an exclusive queue gets a no-local delivered to it, that
@@ -67,19 +67,18 @@
session = self.session
#setup:
- session.queue_declare(queue="test-queue", exclusive=True,
auto_delete=True)
+ session.queue_declare(queue="test-queue", exclusive=True,
auto_delete=True, arguments={'no-local':'true'})
#establish consumer which excludes delivery of locally sent messages
- self.subscribe(destination="local_excluded", queue="test-queue",
no_local=True)
+ self.subscribe(destination="local_excluded", queue="test-queue")
#send a 'local' message
- session.message_transfer(content=Content(properties={'routing_key' :
"test-queue"}, body="local"))
+
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"),
"local"))
#send a non local message
other = self.connect()
- session2 = other.session(1)
- session2.session_open()
- session2.message_transfer(content=Content(properties={'routing_key' :
"test-queue"}, body="foreign"))
- session2.session_close()
+ session2 = other.session("my-session", 1)
+
session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"),
"foreign"))
+ session2.close()
other.close()
#check that the second message only is delivered