Author: aconway
Date: Thu Aug 7 06:45:24 2008
New Revision: 683617
URL: http://svn.apache.org/viewvc?rev=683617&view=rev
Log:
Patch from Gordon Sim to fix issues with hasOutput implementation.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=683617&r1=683616&r2=683617&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Thu Aug 7 06:45:24
2008
@@ -212,9 +212,30 @@
}
}
-bool Queue::empty() const {
+bool Queue::checkForMessages(Consumer& c)
+{
Mutex::ScopedLock locker(messageLock);
- return messages.empty();
+ if (messages.empty()) {
+ //no message available, register consumer for notification
+ //when this changes
+ addListener(c);
+ return false;
+ } else {
+ QueuedMessage msg = messages.front();
+ if (store && !msg.payload->isEnqueueComplete()) {
+ //though a message is on the queue, it has not yet been
+ //enqueued and so is not available for consumption yet,
+ //register consumer for notification when this changes
+ addListener(c);
+ return false;
+ } else {
+ //check that consumer has sufficient credit for the
+ //message (if it does not, no need to register it for
+ //notification as the consumer itself will handle the
+ //credit allocation required to change this condition).
+ return c.accept(msg.payload);
+ }
+ }
}
bool Queue::consumeNextMessage(QueuedMessage& m, Consumer& c)
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=683617&r1=683616&r2=683617&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Thu Aug 7 06:45:24
2008
@@ -107,6 +107,7 @@
void notify();
void removeListener(Consumer&);
+ void addListener(Consumer&);
bool isExcluded(boost::intrusive_ptr<Message>& msg);
@@ -114,8 +115,6 @@
void popAndDequeue();
public:
- // FIXME aconway 2008-08-06: was private, verify if needed public.
- void addListener(Consumer&);
virtual void notifyDurableIOComplete();
typedef boost::shared_ptr<Queue> shared_ptr;
@@ -128,9 +127,14 @@
management::Manageable* parent = 0);
~Queue();
- bool empty() const;
-
bool dispatch(Consumer&);
+ /**
+ * Check whether there would be a message available for
+ * dispatch to this consumer. If not, the consumer will be
+ * notified of events that may have changed this
+ * situation.
+ */
+ bool checkForMessages(Consumer&);
void create(const qpid::framing::FieldTable& settings);
void configure(const qpid::framing::FieldTable& settings);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=683617&r1=683616&r2=683617&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Thu Aug 7
06:45:24 2008
@@ -591,8 +591,7 @@
}
bool SemanticState::ConsumerImpl::hasOutput() {
- queue->addListener(*this);
- return !queue->empty();
+ return queue->checkForMessages(*this);
}
bool SemanticState::ConsumerImpl::doOutput()