Author: gsim
Date: Fri Oct 19 09:21:29 2007
New Revision: 586519
URL: http://svn.apache.org/viewvc?rev=586519&view=rev
Log:
Fix to allocation algorithm in queue: prevents infinite loop when first
consumer for allocation is cancelled after starting to dispatch a particular
method.
Removed alarming(!) log statement.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp?rev=586519&r1=586518&r2=586519&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/NullMessageStore.cpp Fri Oct
19 09:21:29 2007
@@ -49,10 +49,7 @@
NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){}
-void NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/)
-{
- QPID_LOG(info, "Can't init, store not enabled");
-}
+void NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/)
{}
void NullMessageStore::create(PersistableQueue& queue)
{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=586519&r1=586518&r2=586519&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Fri Oct 19 09:21:29
2007
@@ -154,16 +154,15 @@
bool Queue::dispatch(QueuedMessage& msg)
{
+ //additions to the acquirers will result in a separate dispatch
+ //request, so won't result in anyone being missed
+ uint counter = getAcquirerCount();
Consumer::ptr c = allocate();
- Consumer::ptr first = c;
- while(c){
+ while(c && counter--){
if(c->deliver(msg)) {
return true;
} else {
c = allocate();
- if (c == first) {
- break;
- }
}
}
return false;
@@ -194,6 +193,7 @@
Consumers copy;
{
RWlock::ScopedRlock locker(consumerLock);
+ if (browsers.empty()) return;//shortcut
copy = browsers;
}
for (Consumers::iterator i = copy.begin(); i != copy.end(); i++) {
@@ -310,6 +310,11 @@
uint32_t Queue::getConsumerCount() const{
RWlock::ScopedRlock locker(consumerLock);
return acquirers.size() + browsers.size();
+}
+
+uint32_t Queue::getAcquirerCount() const{
+ RWlock::ScopedRlock locker(consumerLock);
+ return acquirers.size();
}
bool Queue::canAutoDelete() const{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=586519&r1=586518&r2=586519&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Fri Oct 19 09:21:29
2007
@@ -107,6 +107,7 @@
void serviceBrowser(Consumer::ptr c);
Consumer::ptr allocate();
bool seek(QueuedMessage& msg, const framing::SequenceNumber&
position);
+ uint32_t getAcquirerCount() const;
protected:
/**