Author: gsim
Date: Tue Oct 16 02:11:48 2007
New Revision: 585085
URL: http://svn.apache.org/viewvc?rev=585085&view=rev
Log:
* Revised allocation algorithm to ensure all consumers are given the
opportunity to consume a message
* If already have infinit credit, don't try to add to it
* If get disconnected while processing close, just finish off the close and
don't signal the disconnection
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 16 02:11:48
2007
@@ -155,12 +155,14 @@
bool Queue::dispatch(QueuedMessage& msg)
{
Consumer* c = allocate();
- int start = next;
+ Consumer* first = c;
while(c){
if(c->deliver(msg)) {
return true;
+ } else {
+ c = allocate();
+ if (c == first) c = 0;
}
- c = next == start ? 0 : allocate();
}
return false;
}
@@ -170,7 +172,10 @@
while(true){
{
Mutex::ScopedLock locker(messageLock);
- if (messages.empty()) break;
+ if (messages.empty()) {
+ QPID_LOG(debug, "No messages to dispatch on queue '" << name
<< "'");
+ break;
+ }
msg = messages.front();
}
if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct 16
02:11:48 2007
@@ -267,7 +267,8 @@
{
Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit <
msg->getRequiredCredit())) {
- QPID_LOG(debug, "Not enough credit for '" << name << "', bytes: " <<
byteCredit << " msgs: " << msgCredit);
+ QPID_LOG(debug, "Not enough credit for '" << name << "' on " <<
parent
+ << ", bytes: " << byteCredit << " msgs: " << msgCredit);
return false;
} else {
uint32_t originalMsgCredit = msgCredit;
@@ -279,8 +280,8 @@
if (byteCredit != 0xFFFFFFFF) {
byteCredit -= msg->getRequiredCredit();
}
- QPID_LOG(debug, "Credit available for '" << name
- << "', was " << " bytes: " << originalByteCredit << " msgs: "
<< originalMsgCredit
+ QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
+ << ", was " << " bytes: " << originalByteCredit << " msgs: "
<< originalMsgCredit
<< " now bytes: " << byteCredit << " msgs: " << msgCredit);
return true;
}
@@ -519,7 +520,9 @@
{
{
Mutex::ScopedLock l(lock);
- byteCredit += value;
+ if (byteCredit != 0xFFFFFFFF) {
+ byteCredit += value;
+ }
}
requestDispatch();
}
@@ -528,7 +531,9 @@
{
{
Mutex::ScopedLock l(lock);
- msgCredit += value;
+ if (msgCredit != 0xFFFFFFFF) {
+ msgCredit += value;
+ }
}
requestDispatch();
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Oct 16
02:11:48 2007
@@ -292,12 +292,10 @@
}
void AsynchIO::disconnected(DispatchHandle& h) {
- // If we've already queued close do it before callback
- if (queuedClose) {
- close(h);
- }
-
- if (disCallback) {
+ // If we've already queued close do it instead of disconnected callback
+ if (queuedClose) {
+ close(h);
+ } else if (disCallback) {
disCallback(*this);
h.unwatch();
}