Author: gsim
Date: Tue Oct 23 07:50:56 2007
New Revision: 587525
URL: http://svn.apache.org/viewvc?rev=587525&view=rev
Log:
Hack for no-local when used with jms topics
Fix for releasing of exclusive ownership of queues second time around
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Tue Oct 23
07:50:56 2007
@@ -202,8 +202,8 @@
getConnection().exclusiveQueues.push_back(queue);
}
} else {
- if (exclusive && !queue->hasExclusiveOwner()) {
- queue->setExclusiveOwner(&getConnection());
+ if (exclusive && queue->setExclusiveOwner(&getConnection())) {
+ getConnection().exclusiveQueues.push_back(queue);
}
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Consumer.h Tue Oct 23
07:50:56 2007
@@ -46,6 +46,7 @@
Consumer(bool preAcquires = true) : acquires(preAcquires) {}
bool preAcquires() const { return acquires; }
virtual bool deliver(QueuedMessage& msg) = 0;
+ virtual bool filter(Message::shared_ptr) { return true; }
virtual ~Consumer(){}
};
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 23 07:50:56
2007
@@ -151,15 +151,33 @@
serializer.execute(f);
}
+/**
+ * Return true if the message can be excluded. This is currently the
+ * case if the queue has an exclusive consumer that will never want
+ * the message, or if the queue is exclusive to a single connection
+ * and has a single consumer (covers the JMS topic case).
+ */
+bool Queue::exclude(Message::shared_ptr msg)
+{
+ RWlock::ScopedWlock locker(consumerLock);
+ if (exclusive) {
+ return !exclusive->filter(msg);
+ } else if (hasExclusiveOwner() && acquirers.size() == 1) {
+ return !acquirers[0]->filter(msg);
+ } else {
+ return false;
+ }
+}
+
Consumer::ptr Queue::allocate()
{
RWlock::ScopedWlock locker(consumerLock);
- if(acquirers.empty()){
+ if (acquirers.empty()) {
return Consumer::ptr();
- }else if(exclusive){
+ } else if (exclusive){
return exclusive;
- }else{
+ } else {
next = next % acquirers.size();
return acquirers[next++];
}
@@ -171,9 +189,9 @@
//request, so won't result in anyone being missed
uint counter = getAcquirerCount();
Consumer::ptr c = allocate();
- while(c && counter--){
- if(c->deliver(msg)) {
- return true;
+ while (c && counter--){
+ if (c->deliver(msg)) {
+ return true;
} else {
c = allocate();
}
@@ -181,22 +199,31 @@
return false;
}
-void Queue::dispatch(){
+bool Queue::getNextMessage(QueuedMessage& msg)
+{
+ Mutex::ScopedLock locker(messageLock);
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
+ return false;
+ } else {
+ msg = messages.front();
+ return true;
+ }
+}
+
+void Queue::dispatch()
+{
QueuedMessage msg;
- while(true){
- {
- Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) {
- QPID_LOG(debug, "No messages to dispatch on queue '" << name
<< "'");
- break;
- }
- msg = messages.front();
- }
- if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
- pop();
- } else {
- break;
- }
+ while (getNextMessage(msg) && msg.payload->isEnqueueComplete()){
+ if (dispatch(msg)) {
+ pop();
+ } else if (exclude(msg.payload)) {
+ pop();
+ dequeue(0, msg.payload);
+ QPID_LOG(debug, "Message " << msg.payload << " filtered out of "
<< name << "[" << this << "]");
+ } else {
+ break;
+ }
}
serviceAllBrowsers();
}
@@ -478,4 +505,38 @@
queue->destroy();
}
+}
+
+bool Queue::isExclusiveOwner(const ConnectionToken* const o) const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return o == owner;
+}
+
+void Queue::releaseExclusiveOwnership()
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ owner = 0;
+}
+
+bool Queue::setExclusiveOwner(const ConnectionToken* const o)
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ if (owner) {
+ return false;
+ } else {
+ owner = o;
+ return true;
+ }
+}
+
+bool Queue::hasExclusiveOwner() const
+{
+ Mutex::ScopedLock locker(ownershipLock);
+ return owner != 0;
+}
+
+bool Queue::hasExclusiveConsumer() const
+{
+ return exclusive;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Oct 23 07:50:56
2007
@@ -85,6 +85,7 @@
int next;
mutable qpid::sys::RWlock consumerLock;
mutable qpid::sys::Mutex messageLock;
+ mutable qpid::sys::Mutex ownershipLock;
Consumer::ptr exclusive;
mutable uint64_t persistenceId;
framing::FieldTable settings;
@@ -110,6 +111,8 @@
Consumer::ptr allocate();
bool seek(QueuedMessage& msg, const framing::SequenceNumber&
position);
uint32_t getAcquirerCount() const;
+ bool getNextMessage(QueuedMessage& msg);
+ bool exclude(Message::shared_ptr msg);
protected:
/**
@@ -172,11 +175,11 @@
uint32_t getMessageCount() const;
uint32_t getConsumerCount() const;
inline const string& getName() const { return name; }
- inline const bool isExclusiveOwner(const ConnectionToken* const o)
const { return o == owner; }
- inline void releaseExclusiveOwnership() { owner = 0; }
- inline void setExclusiveOwner(const ConnectionToken* const o) {
owner = o; }
- inline bool hasExclusiveConsumer() const { return exclusive; }
- inline bool hasExclusiveOwner() const { return owner != 0; }
+ bool isExclusiveOwner(const ConnectionToken* const o) const;
+ void releaseExclusiveOwnership();
+ bool setExclusiveOwner(const ConnectionToken* const o);
+ bool hasExclusiveConsumer() const;
+ bool hasExclusiveOwner() const;
inline bool isDurable() const { return store != 0; }
inline const framing::FieldTable& getSettings() const { return
settings; }
inline bool isAutoDelete() const { return autodelete; }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct 23
07:50:56 2007
@@ -260,10 +260,18 @@
parent->deliveryAdapter.deliver(msg.payload, token);
if (windowing || ackExpected) {
parent->record(DeliveryRecord(msg, queue, name, token,
deliveryTag, acquire, !ackExpected));
+ } else if (!ackExpected) {
+ queue->dequeue(0, msg.payload);
}
}
return !blocked;
}
+}
+
+bool SemanticState::ConsumerImpl::filter(Message::shared_ptr msg)
+{
+ return !(nolocal &&
+ &parent->getSession().getConnection() == msg->getPublisher());
}
bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=587525&r1=587524&r2=587525&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Oct 23
07:50:56 2007
@@ -78,6 +78,7 @@
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
+ bool filter(Message::shared_ptr msg);
void setWindowMode();
void setCreditMode();