Author: astitcher
Date: Tue Apr 29 15:46:23 2008
New Revision: 652180

URL: http://svn.apache.org/viewvc?rev=652180&view=rev
Log:
More RDMA Work in Progress
    Changes to client buffering
    Buffering improvement to server
    Removed unused state machine from RdmaIO code
    Move the write throttling due to limited write buffers into the RdmaIO code

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=652180&r1=652179&r2=652180&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Tue Apr 29 
15:46:23 2008
@@ -30,8 +30,8 @@
 int64_t sbytes = 0;
 int64_t rmsgs = 0;
 int64_t rbytes = 0;
-
-int outstandingwrites = 0;
+int64_t cmsgs = 0;
+int writable = true;
 
 int target = 1000000;
 int msgsize = 200;
@@ -42,17 +42,18 @@
 vector<char> testString;
 
 void write(Rdma::AsynchIO& aio) {
-    //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
-        while (smsgs < target && outstandingwrites < 
(3*Rdma::DEFAULT_WR_ENTRIES/4)) {
+    if ((cmsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
+        while (writable) {
+            if (smsgs >= target)
+                return;
             Rdma::Buffer* b = aio.getBuffer();
             std::copy(testString.begin(), testString.end(), b->bytes);
             b->dataCount = msgsize;
             aio.queueWrite(b);
-            ++outstandingwrites;
             ++smsgs;
             sbytes += b->byteCount;
         }
-    //}
+    }
 }
 
 void dataError(Rdma::AsynchIO&) {
@@ -66,24 +67,27 @@
     // When all messages have been recvd stop
     if (rmsgs < target) {
         write(aio);
-        return;
+    } else {
+        fullTestDuration = std::min(fullTestDuration, Duration(startTime, 
AbsTime::now()));
+        if (cmsgs >= target)
+            p->shutdown();
     }
+}
 
-    fullTestDuration = std::min(fullTestDuration, Duration(startTime, 
AbsTime::now()));
-    if (outstandingwrites == 0)
-        p->shutdown();
+void full(Rdma::AsynchIO&) {
+    writable = false;
 }
 
 void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
-    --outstandingwrites;
+    writable = true;
+    ++cmsgs;
     if (smsgs < target) {
         write(aio);
-        return;
+    } else {
+        sendingDuration = std::min(sendingDuration, Duration(startTime, 
AbsTime::now()));
+        if (rmsgs >= target && cmsgs >= target)
+            p->shutdown();
     }
-
-    sendingDuration = std::min(sendingDuration, Duration(startTime, 
AbsTime::now()));
-    if (smsgs >= target && rmsgs >= target && outstandingwrites == 0)
-        p->shutdown();
 }
 
 void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) 
{
@@ -93,6 +97,7 @@
     Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
         boost::bind(&data, poller, _1, _2),
         boost::bind(&idle, poller, _1),
+        &full,
         dataError);
 
     startTime = AbsTime::now();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=652180&r1=652179&r2=652180&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Tue Apr 29 
15:46:23 2008
@@ -9,14 +9,18 @@
             int s,
             ReadCallback rc,
             IdleCallback ic,
+            FullCallback fc,
             ErrorCallback ec
     ) :
         qp(q),
         dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
         bufferSize(s),
         recvBufferCount(DEFAULT_WR_ENTRIES),
+        xmitBufferCount(DEFAULT_WR_ENTRIES),
+        outstandingWrites(0),
         readCallback(rc),
         idleCallback(ic),
+        fullCallback(fc),
         errorCallback(ec)
     {
         qp->nonblocking();
@@ -40,20 +44,28 @@
         dataHandle.startWatch(poller);
     }
 
-    void AsynchIO::queueReadBuffer(Buffer*) {
-    }
-
+    // TODO: Currently we don't prevent write buffer overrun we just advise
+    // when to stop writing.
     void AsynchIO::queueWrite(Buffer* buff) {
         qp->postSend(buff);
+        ++outstandingWrites;
+        if (outstandingWrites >= xmitBufferCount) {
+            fullCallback(*this);
+        }
     }
 
     void AsynchIO::notifyPendingWrite() {
+        // Just perform the idle callback (if possible)
+        if (outstandingWrites < xmitBufferCount) {
+            idleCallback(*this);
+        }
     }
 
     void AsynchIO::queueWriteClose() {
     }
 
     Buffer* AsynchIO::getBuffer() {
+        qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
         if (bufferQueue.empty()) {
             Buffer* b = qp->createBuffer(bufferSize);
             buffers.push_front(b);
@@ -103,7 +115,12 @@
                 // At this point the buffer has been consumed so put it back 
on the recv queue
                 qp->postRecv(b);
             } else {
+                {
+                qpid::sys::ScopedLock<qpid::sys::Mutex> l(bufferQueueLock);
                 bufferQueue.push_front(b);
+                }
+                --outstandingWrites;
+                // TODO: maybe don't call idle unless we're low on write 
buffers
                 idleCallback(*this);
             }
         } while (true);
@@ -122,8 +139,7 @@
         connectedCallback(cc),
         errorCallback(errc),
         disconnectedCallback(dc),
-        connectionRequestCallback(crc),
-        state(IDLE)
+        connectionRequestCallback(crc)
     {
         ci->nonblocking();
     }
@@ -131,7 +147,6 @@
     void Listener::start(Poller::shared_ptr poller) {
         ci->bind(src_addr);
         ci->listen();
-        state = LISTENING;
         handle.startWatch(poller);
     }
 
@@ -194,15 +209,13 @@
         connectedCallback(cc),
         errorCallback(errc),
         disconnectedCallback(dc),
-        rejectedCallback(rc),
-        state(IDLE)
+        rejectedCallback(rc)
     {
         ci->nonblocking();
     }
 
     void Connector::start(Poller::shared_ptr poller) {
         ci->resolve_addr(dst_addr);
-        state = RESOLVE_ADDR;
         handle.startWatch(poller);
     }
 
@@ -214,138 +227,45 @@
             return;
 
         ::rdma_cm_event_type eventType = e.getEventType();
-#if 1
         switch (eventType) {
         case RDMA_CM_EVENT_ADDR_RESOLVED:
             // RESOLVE_ADDR
-            state = RESOLVE_ROUTE;
             ci->resolve_route();
             break;
         case RDMA_CM_EVENT_ADDR_ERROR:
             // RESOLVE_ADDR
-            state = ERROR;
             errorCallback(ci);
             break;
         case RDMA_CM_EVENT_ROUTE_RESOLVED:
             // RESOLVE_ROUTE:
-            state = CONNECTING;
             ci->connect();
             break;
         case RDMA_CM_EVENT_ROUTE_ERROR:
             // RESOLVE_ROUTE:
-            state = ERROR;
             errorCallback(ci);
             break;
         case RDMA_CM_EVENT_CONNECT_ERROR:
             // CONNECTING
-            state = ERROR;
             errorCallback(ci);
             break;
         case RDMA_CM_EVENT_UNREACHABLE:
             // CONNECTING
-            state = ERROR;
             errorCallback(ci);
             break;
         case RDMA_CM_EVENT_REJECTED:
             // CONNECTING
-            state = REJECTED;
             rejectedCallback(ci);
             break;
         case RDMA_CM_EVENT_ESTABLISHED:
             // CONNECTING
-            state = ESTABLISHED;
             connectedCallback(ci);
             break;
         case RDMA_CM_EVENT_DISCONNECTED:
             // ESTABLISHED
-            state = DISCONNECTED;
             disconnectedCallback(ci);
             break;
         default:
-            std::cerr << "Warning: unexpected event in " << state << " state - 
" << eventType << "\n";
-            state = ERROR;
-        }
-#else
-        switch (state) {
-        case IDLE:
-            std::cerr << "Warning: event in IDLE state\n";
-            break;
-        case RESOLVE_ADDR:
-            switch (eventType) {
-            case RDMA_CM_EVENT_ADDR_RESOLVED:
-                state = RESOLVE_ROUTE;
-                ci->resolve_route();
-                break;
-            case RDMA_CM_EVENT_ADDR_ERROR:
-                state = ERROR;
-                errorCallback(ci);
-                break;
-            default:
-                state = ERROR;
-                std::cerr << "Warning: unexpected response to resolve_addr - " 
<< eventType << "\n";
-            }
-            break;
-        case RESOLVE_ROUTE:
-            switch (eventType) {
-            case RDMA_CM_EVENT_ROUTE_RESOLVED:
-                state = CONNECTING;
-                ci->connect();
-                break;
-            case RDMA_CM_EVENT_ROUTE_ERROR:
-                state = ERROR;
-                errorCallback(ci);
-                break;
-            default:
-                state = ERROR;
-                std::cerr << "Warning: unexpected response to resolve_route - 
" << eventType << "\n";
-            }
-            break;
-        case CONNECTING:
-            switch (eventType) {
-            case RDMA_CM_EVENT_CONNECT_RESPONSE:
-                std::cerr << "connect_response\n";
-                break;
-            case RDMA_CM_EVENT_CONNECT_ERROR:
-                state = ERROR;
-                errorCallback(ci);
-                break;
-            case RDMA_CM_EVENT_UNREACHABLE:
-                state = ERROR;
-                errorCallback(ci);
-                break;
-            case RDMA_CM_EVENT_REJECTED:
-                state = REJECTED;
-                rejectedCallback(ci);
-                break;
-            case RDMA_CM_EVENT_ESTABLISHED:
-                state = ESTABLISHED;
-                connectedCallback(ci);
-                break;
-            default:
-                state = ERROR;
-                std::cerr << "Warning: unexpected response to connect - " << 
eventType << "\n";
-            }
-            break;
-        case ESTABLISHED:
-            switch (eventType) {
-            case RDMA_CM_EVENT_DISCONNECTED:
-                disconnectedCallback(ci);
-                break;
-            default:
-                std::cerr << "Warning: unexpected event in ESTABLISHED state - 
" << eventType << "\n";
-            }
-            break;
-        case REJECTED:
-            std::cerr << "Warning: event in REJECTED state - " << eventType << 
"\n";
-            break;
-        case ERROR:
-            std::cerr << "Warning: event in ERROR state - " << eventType << 
"\n";
-            break;
-        case LISTENING:
-        case ACCEPTING:
-            std::cerr << "Warning: in an illegal state (and received event!) - 
" << eventType << "\n";
-            break;
+            std::cerr << "Warning: unexpected event in connect: " << eventType 
<< "\n";
         }
-#endif
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=652180&r1=652179&r2=652180&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Tue Apr 29 
15:46:23 2008
@@ -4,6 +4,7 @@
 #include "rdma_wrap.h"
 
 #include "qpid/sys/Dispatcher.h"
+#include "qpid/sys/Mutex.h"
 
 #include <netinet/in.h>
 
@@ -17,18 +18,6 @@
 namespace Rdma {
 
     class Connection;
-    enum ConnectionState {
-        IDLE,
-        RESOLVE_ADDR,
-        RESOLVE_ROUTE,
-        LISTENING,
-        CONNECTING,
-        ACCEPTING,
-        ESTABLISHED,
-        REJECTED,
-        DISCONNECTED,
-        ERROR
-    };
 
     typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> 
ConnectedCallback;
     typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> 
ErrorCallback;
@@ -41,16 +30,21 @@
         typedef boost::function1<void, AsynchIO&> ErrorCallback;
         typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
         typedef boost::function1<void, AsynchIO&>  IdleCallback;
+        typedef boost::function1<void, AsynchIO&>  FullCallback;
 
         QueuePair::intrusive_ptr qp;
         DispatchHandle dataHandle;
         int bufferSize;
         int recvBufferCount;
+        int xmitBufferCount;
+        int outstandingWrites;
         std::deque<Buffer*> bufferQueue;
+        qpid::sys::Mutex bufferQueueLock;
         boost::ptr_deque<Buffer> buffers;
 
         ReadCallback readCallback;
         IdleCallback idleCallback;
+        FullCallback fullCallback;
         ErrorCallback errorCallback;
 
     public:
@@ -59,12 +53,12 @@
             int s,
             ReadCallback rc,
             IdleCallback ic,
+            FullCallback fc,
             ErrorCallback ec
         );
         ~AsynchIO();
 
         void start(Poller::shared_ptr poller);
-        void queueReadBuffer(Buffer* buff);
         void queueWrite(Buffer* buff);
         void notifyPendingWrite();
         void queueWriteClose();
@@ -83,7 +77,6 @@
         ErrorCallback errorCallback;
         DisconnectedCallback disconnectedCallback;
         ConnectionRequestCallback connectionRequestCallback;
-        ConnectionState state;
 
     public:
         Listener(
@@ -108,7 +101,6 @@
         ErrorCallback errorCallback;
         DisconnectedCallback disconnectedCallback;
         RejectedCallback rejectedCallback;
-        ConnectionState state;
 
     public:
         Connector(

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=652180&r1=652179&r2=652180&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Tue Apr 29 
15:46:23 2008
@@ -22,12 +22,12 @@
 struct ConRec {
     Rdma::Connection::intrusive_ptr connection;
     Rdma::AsynchIO* data;
-    int outstandingWrites;
+    bool writable;
     queue<Rdma::Buffer*> queuedWrites;
 
     ConRec(Rdma::Connection::intrusive_ptr c) :
         connection(c),
-        outstandingWrites(0)
+        writable(true)
     {}
 };
 
@@ -40,23 +40,24 @@
     Rdma::Buffer* buf = a.getBuffer();
     std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, 
buf->bytes);
     buf->dataCount = b->dataCount;
-    if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
+    if (cr->queuedWrites.empty() && cr->writable) {
         a.queueWrite(buf);
-        ++(cr->outstandingWrites);
     } else {
         cr->queuedWrites.push(buf);
     }
 }
 
+void full(ConRec* cr, Rdma::AsynchIO&) {
+    cr->writable = false;
+}
+
 void idle(ConRec* cr, Rdma::AsynchIO& a) {
-    --(cr->outstandingWrites);
-    //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4)
-        while (!cr->queuedWrites.empty() && cr->outstandingWrites < 
3*Rdma::DEFAULT_WR_ENTRIES/4) {
-            Rdma::Buffer* buf = cr->queuedWrites.front();
-            cr->queuedWrites.pop();
-            a.queueWrite(buf);
-            ++(cr->outstandingWrites);
-        }
+    cr->writable = true;
+    while (!cr->queuedWrites.empty() && cr->writable) {
+        Rdma::Buffer* buf = cr->queuedWrites.front();
+        cr->queuedWrites.pop();
+        a.queueWrite(buf);
+    }
 }
 
 void disconnected(Rdma::Connection::intrusive_ptr& ci) {
@@ -82,7 +83,7 @@
 
     // For fun reject alternate connection attempts
     static bool x = false;
-    x ^= 1;
+    x = true;
 
     // Must create aio here so as to prepost buffers *before* we accept 
connection
     if (x) {
@@ -91,6 +92,7 @@
             new Rdma::AsynchIO(ci->getQueuePair(), 8000,
                 boost::bind(data, cr, _1, _2),
                 boost::bind(idle, cr, _1),
+                boost::bind(full, cr, _1),
                 dataError);
         ci->addContext(cr);
         cr->data = aio;


Reply via email to