Author: gsim
Date: Tue Oct 16 02:11:48 2007
New Revision: 585085

URL: http://svn.apache.org/viewvc?rev=585085&view=rev
Log:
* Revised allocation algorithm to ensure all consumers are given the 
opportunity to consume a message
* If already have infinit credit, don't try to add to it
* If get disconnected while processing close, just finish off the close and 
don't signal the disconnection


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Oct 16 02:11:48 
2007
@@ -155,12 +155,14 @@
 bool Queue::dispatch(QueuedMessage& msg)
 {
     Consumer* c = allocate();
-    int start = next;
+    Consumer* first = c;
     while(c){
         if(c->deliver(msg)) {
             return true;            
+        } else {
+            c = allocate();
+            if (c == first) c = 0;
         }
-        c = next == start ? 0 : allocate();            
     }
     return false;
 }
@@ -170,7 +172,10 @@
      while(true){
         {
            Mutex::ScopedLock locker(messageLock);
-           if (messages.empty()) break; 
+           if (messages.empty()) { 
+                QPID_LOG(debug, "No messages to dispatch on queue '" << name 
<< "'");
+                break; 
+            }
            msg = messages.front();
        }
         if( msg.payload->isEnqueueComplete() && dispatch(msg) ) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Tue Oct 16 
02:11:48 2007
@@ -267,7 +267,8 @@
 {
     Mutex::ScopedLock l(lock);
     if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < 
msg->getRequiredCredit())) {
-        QPID_LOG(debug, "Not enough credit for '" << name << "', bytes: " << 
byteCredit << " msgs: " << msgCredit);
+        QPID_LOG(debug, "Not enough credit for '" << name  << "' on " << 
parent 
+                 << ", bytes: " << byteCredit << " msgs: " << msgCredit);
         return false;
     } else {
         uint32_t originalMsgCredit = msgCredit;
@@ -279,8 +280,8 @@
         if (byteCredit != 0xFFFFFFFF) {
             byteCredit -= msg->getRequiredCredit();
         }
-        QPID_LOG(debug, "Credit available for '" << name 
-                 << "', was " << " bytes: " << originalByteCredit << " msgs: " 
<< originalMsgCredit
+        QPID_LOG(debug, "Credit available for '" << name << "' on " << parent
+                 << ", was " << " bytes: " << originalByteCredit << " msgs: " 
<< originalMsgCredit
                  << " now bytes: " << byteCredit << " msgs: " << msgCredit);
         return true;
     }
@@ -519,7 +520,9 @@
 {
     {
         Mutex::ScopedLock l(lock);
-        byteCredit += value;
+        if (byteCredit != 0xFFFFFFFF) {
+            byteCredit += value;
+        }
     }
     requestDispatch();
 }
@@ -528,7 +531,9 @@
 {
     {
         Mutex::ScopedLock l(lock);
-        msgCredit += value;
+        if (msgCredit != 0xFFFFFFFF) {
+            msgCredit += value;
+        }
     }
     requestDispatch();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=585085&r1=585084&r2=585085&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Tue Oct 16 
02:11:48 2007
@@ -292,12 +292,10 @@
 }
         
 void AsynchIO::disconnected(DispatchHandle& h) {
-       // If we've already queued close do it before callback
-       if (queuedClose) {
-               close(h);
-       }
-       
-    if (disCallback) {
+    // If we've already queued close do it instead of disconnected callback
+    if (queuedClose) {
+        close(h);
+    } else if (disCallback) {
         disCallback(*this);
         h.unwatch();
     }


Reply via email to