Author: astitcher
Date: Tue Feb 24 22:38:08 2009
New Revision: 747587

URL: http://svn.apache.org/viewvc?rev=747587&view=rev
Log:
Changed the producer rate limit timer callback
so that it generates a callback serialised with
the connection

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=747587&r1=747586&r2=747587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Feb 24 22:38:08 2009
@@ -80,7 +80,8 @@
 
 void Connection::requestIOProcessing(boost::function0<void> callback)
 {
-    ioCallback = callback;
+    ScopedLock<Mutex> l(ioCallbackLock);
+    ioCallbacks.push(callback);
     out.activateOutput();
 }
 
@@ -221,10 +222,13 @@
 
 bool Connection::doOutput() {    
     try{
-        if (ioCallback)
-            ioCallback(); // Lend the IO thread for management processing
-        ioCallback = 0;
-
+        {
+        ScopedLock<Mutex> l(ioCallbackLock);
+        while (!ioCallbacks.empty()) {
+            ioCallbacks.front()(); // Lend the IO thread for management 
processing
+            ioCallbacks.pop();
+        }
+        }
         if (mgmtClosing)
             close(connection::CLOSE_CODE_CONNECTION_FORCED, "Closed by 
Management Request");
         else

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=747587&r1=747586&r2=747587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Feb 24 22:38:08 2009
@@ -25,6 +25,7 @@
 #include <memory>
 #include <sstream>
 #include <vector>
+#include <queue>
 
 #include <boost/ptr_container/ptr_map.hpp>
 
@@ -47,6 +48,7 @@
 #include "qpid/sys/ConnectionOutputHandler.h"
 #include "qpid/sys/Socket.h"
 #include "qpid/sys/TimeoutHandler.h"
+#include "qpid/sys/Mutex.h"
 
 #include <boost/ptr_container/ptr_map.hpp>
 #include <boost/bind.hpp>
@@ -119,7 +121,8 @@
     const bool isLink;
     bool mgmtClosing;
     const std::string mgmtId;
-    boost::function0<void> ioCallback;
+    sys::Mutex ioCallbackLock;
+    std::queue<boost::function0<void> > ioCallbacks;
     qmf::org::apache::qpid::broker::Connection* mgmtObject;
     LinkRegistry& links;
     management::ManagementAgent* agent;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h?rev=747587&r1=747586&r2=747587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/ConnectionState.h Tue Feb 24 22:38:08 
2009
@@ -100,6 +100,7 @@
     framing::FrameHandler* getClusterOrderOutput() { return clusterOrderOut; }
     void setClusterOrderOutput(framing::FrameHandler& fh) { clusterOrderOut = 
&fh; }
 
+    virtual void requestIOProcessing (boost::function0<void>) = 0;
 
   protected:
     framing::ProtocolVersion version;

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=747587&r1=747586&r2=747587&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Feb 24 22:38:08 
2009
@@ -212,11 +212,15 @@
     void fire() {
         // This is the best we can currently do to avoid a destruction/fire 
race
         if (!isCancelled()) {
-            if ( !sessionState.processSendCredit(0) ) {
-                QPID_LOG(warning, sessionState.getId() << ": Reschedule 
sending credit");
-                reset();
-                timer.add(this);
-            }
+            
sessionState.getConnection().requestIOProcessing(boost::bind(&ScheduledCreditTask::sendCredit,
 this));
+        }
+    }
+    
+    void sendCredit() {
+        if ( !sessionState.processSendCredit(0) ) {
+            QPID_LOG(warning, sessionState.getId() << ": Reschedule sending 
credit");
+            reset();
+            timer.add(this);
         }
     }
 };



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to