Author: astitcher
Date: Sun Apr 27 21:41:46 2008
New Revision: 652053

URL: http://svn.apache.org/viewvc?rev=652053&view=rev
Log:
Work In Progress:
  Added initial rdma code including test server and client
  Turn off rdma support by default but autoconf should now detect whether
  necessary rdma/ibverbs libs and headers are present

Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
Modified:
    incubator/qpid/trunk/qpid/cpp/configure.ac
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am

Modified: incubator/qpid/trunk/qpid/cpp/configure.ac
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/configure.ac?rev=652053&r1=652052&r2=652053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/configure.ac (original)
+++ incubator/qpid/trunk/qpid/cpp/configure.ac Sun Apr 27 21:41:46 2008
@@ -109,7 +109,7 @@
 AC_CHECK_PROG([VALGRIND], [valgrind], [valgrind])
 test "$enable_VALGRIND" = no && VALGRIND=
 
-# If rpmlint is availalbe we'll run it when building RPMs.
+# If rpmlint is available we'll run it when building RPMs.
 AC_CHECK_PROG([RPMLINT], [rpmlint], [rpmlint])
 AM_CONDITIONAL([HAS_RPMLINT], [test -n "$RPMLINT"])
 
@@ -193,6 +193,34 @@
      [AC_DEFINE([BROKER_SASL_NAME], ["qpidd"],
                 [The SASL app name for the qpid Broker])
       AC_DEFINE([HAVE_SASL], [1], [Enable if libsasl is present])])])
+
+# Setup --with-rdma/--without-rdma as arguments to configure
+AC_ARG_WITH([rdma],
+  [AS_HELP_STRING([--with-rdma], [Build with support for Remote DMA 
protocols])],
+  [case ${withval} in
+   yes)
+     with_RDMA=yes
+     AC_CHECK_LIB([ibverbs],[ibv_create_qp],,[AC_MSG_ERROR([libibverbs not 
found])])
+     AC_CHECK_LIB([rdmacm],[rdma_create_id],,[AC_MSG_ERROR([librdmacm not 
found])])
+     AC_CHECK_HEADERS([infiniband/verbs.h],,[AC_MSG_ERROR([ibverbs header 
files not found])])
+     AC_CHECK_HEADERS([rdma/rdma_cma.h],,[AC_MSG_ERROR([rdma_cm header files 
not found])])
+     ;;
+   no)
+     with_RDMA=no
+     ;;
+   *)
+     AC_MSG_ERROR([Bad value for --with-rdma: ${withval}])
+     ;;
+   esac],
+  [
+   with_RDMA=no
+   AC_CHECK_LIB([ibverbs],[ibv_create_qp],,[with_RDMA=no])
+   AC_CHECK_LIB([rdmacm],[rdma_create_id],,[with_RDMA=no])
+   AC_CHECK_HEADERS([infiniband/verbs.h],,[with_RDMA=no])
+   AC_CHECK_HEADERS([rdma/rdma_cma.h],,[with_RDMA=no])
+  ]
+)
+AM_CONDITIONAL([RDMA], [test x$with_RDMA = xyes])
 
 # Files to generate    
 AC_CONFIG_FILES([

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=652053&r1=652052&r2=652053&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Sun Apr 27 21:41:46 2008
@@ -91,6 +91,39 @@
 libLogger_la_SOURCES=qpid/log/Logger.cpp qpid/log/Logger.h
 libLogger_la_CXXFLAGS=$(AM_CXXFLAGS) -Wno-unused-parameter
 
+if RDMA
+
+# RDMA (Infiniband) protocol code
+libqpidrdma_la_SOURCES = \
+  qpid/sys/rdma/rdma_exception.h \
+  qpid/sys/rdma/rdma_factories.cpp \
+  qpid/sys/rdma/RdmaIO.cpp \
+  qpid/sys/rdma/RdmaIO.h \
+  qpid/sys/rdma/rdma_wrap.h
+libqpidrdma_la_LIBADD = \
+  -lrdmacm \
+  -libverbs
+libqpidrdma_la_CXXFLAGS = \
+  $(AM_CXXFLAGS) -Wno-missing-field-initializers
+noinst_LTLIBRARIES += \
+  libqpidrdma.la
+qpidd_LDADD += \
+  libqpidrdma.la
+
+noinst_PROGRAMS += RdmaServer RdmaClient
+RdmaServer_SOURCES = qpid/sys/rdma/RdmaServer.cpp
+RdmaServer_CXXFLAGS = \
+  $(AM_CXXFLAGS) -Wno-missing-field-initializers
+RdmaServer_LDADD = \
+  libqpidrdma.la libqpidcommon.la
+RdmaClient_SOURCES = qpid/sys/rdma/RdmaClient.cpp
+RdmaClient_CXXFLAGS = \
+  $(AM_CXXFLAGS) -Wno-missing-field-initializers
+RdmaClient_LDADD = \
+  libqpidrdma.la libqpidcommon.la
+
+endif
+
 # New 0-10 codec, to be integrated in future.
 # libqpidamqp_0_10_la_SOURCES= 
 EXTRA_DIST+=\

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaClient.cpp Sun Apr 27 
21:41:46 2008
@@ -0,0 +1,178 @@
+#include "RdmaIO.h"
+#include "qpid/sys/Time.h"
+
+#include <netdb.h>
+#include <arpa/inet.h>
+
+#include <vector>
+#include <string>
+#include <iostream>
+#include <algorithm>
+#include <cmath>
+#include <boost/bind.hpp>
+
+using std::vector;
+using std::string;
+using std::cout;
+using std::cerr;
+using std::copy;
+using std::rand;
+
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::TIME_SEC;
+using qpid::sys::TIME_INFINITE;
+
+// count of messages
+int64_t smsgs = 0;
+int64_t sbytes = 0;
+int64_t rmsgs = 0;
+int64_t rbytes = 0;
+
+int outstandingwrites = 0;
+
+int target = 1000000;
+int msgsize = 200;
+AbsTime startTime;
+Duration sendingDuration(TIME_INFINITE);
+Duration fullTestDuration(TIME_INFINITE);
+
+vector<char> testString;
+
+void write(Rdma::AsynchIO& aio) {
+    //if ((smsgs - rmsgs) < Rdma::DEFAULT_WR_ENTRIES/2) {
+        while (smsgs < target && outstandingwrites < 
(3*Rdma::DEFAULT_WR_ENTRIES/4)) {
+            Rdma::Buffer* b = aio.getBuffer();
+            std::copy(testString.begin(), testString.end(), b->bytes);
+            b->dataCount = msgsize;
+            aio.queueWrite(b);
+            ++outstandingwrites;
+            ++smsgs;
+            sbytes += b->byteCount;
+        }
+    //}
+}
+
+void dataError(Rdma::AsynchIO&) {
+    cout << "Data error:\n";
+}
+
+void data(Poller::shared_ptr p, Rdma::AsynchIO& aio, Rdma::Buffer* b) {
+    ++rmsgs;
+    rbytes += b->byteCount;
+
+    // When all messages have been recvd stop
+    if (rmsgs < target) {
+        write(aio);
+        return;
+    }
+
+    fullTestDuration = std::min(fullTestDuration, Duration(startTime, 
AbsTime::now()));
+    if (outstandingwrites == 0)
+        p->shutdown();
+}
+
+void idle(Poller::shared_ptr p, Rdma::AsynchIO& aio) {
+    --outstandingwrites;
+    if (smsgs < target) {
+        write(aio);
+        return;
+    }
+
+    sendingDuration = std::min(sendingDuration, Duration(startTime, 
AbsTime::now()));
+    if (smsgs >= target && rmsgs >= target && outstandingwrites == 0)
+        p->shutdown();
+}
+
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) 
{
+    cout << "Connected\n";
+    Rdma::QueuePair::intrusive_ptr q = ci->getQueuePair();
+
+    Rdma::AsynchIO* aio = new Rdma::AsynchIO(ci->getQueuePair(), msgsize,
+        boost::bind(&data, poller, _1, _2),
+        boost::bind(&idle, poller, _1),
+        dataError);
+
+    startTime = AbsTime::now();
+    write(*aio);
+
+    aio->start(poller);
+}
+
+void disconnected(boost::shared_ptr<Poller> p, 
Rdma::Connection::intrusive_ptr&) {
+    cout << "Disconnected\n";
+    p->shutdown();
+}
+
+void connectionError(boost::shared_ptr<Poller> p, 
Rdma::Connection::intrusive_ptr&) {
+    cout << "Connection error\n";
+    p->shutdown();
+}
+
+void rejected(boost::shared_ptr<Poller> p, Rdma::Connection::intrusive_ptr&) {
+    cout << "Connection rejected\n";
+    p->shutdown();
+}
+
+int main(int argc, char* argv[]) {
+    vector<string> args(&argv[0], &argv[argc]);
+
+    ::addrinfo *res;
+    ::addrinfo hints = {};
+    hints.ai_family = AF_INET;
+    hints.ai_socktype = SOCK_STREAM;
+    string port = (args.size() < 3) ? "20079" : args[2];
+    int n = ::getaddrinfo(args[1].c_str(), port.c_str(), &hints, &res);
+    if (n<0) {
+        cerr << "Can't find information for: " << args[1] << "\n";
+        return 1;
+    } else {
+        cout << "Connecting to: " << args[1] << ":" << port <<"\n";
+    }
+
+    if (args.size() > 3)
+        msgsize = atoi(args[3].c_str());
+    cout << "Message size: " << msgsize << "\n";
+
+    // Make a random message of that size
+    testString.resize(msgsize);
+    for (int i = 0; i < msgsize; ++i) {
+        testString[i] = 32 + rand() & 0x3f;
+    }
+
+    try {
+        boost::shared_ptr<Poller> p(new Poller());
+        Dispatcher d(p);
+
+        Rdma::Connector c(
+            *res->ai_addr,
+            boost::bind(&connected, p, _1),
+            boost::bind(&connectionError, p, _1),
+            boost::bind(&disconnected, p, _1),
+            boost::bind(&rejected, p, _1));
+
+        c.start(p);
+        d.run();
+    } catch (Rdma::Exception& e) {
+        int err = e.getError();
+        cerr << "Error: " << e.what() << "(" << err << ")\n";
+    }
+
+    cout
+        << "Sent: " << smsgs
+        << "msgs (" << sbytes
+        << "bytes) in: " << double(sendingDuration)/TIME_SEC
+        << "s: " << double(smsgs)*TIME_SEC/sendingDuration
+        << "msgs/s(" << double(sbytes)*TIME_SEC/sendingDuration
+        << "bytes/s)\n";
+    cout
+        << "Recd: " << rmsgs
+        << "msgs (" << rbytes
+        << "bytes) in: " << double(fullTestDuration)/TIME_SEC
+        << "s: " << double(rmsgs)*TIME_SEC/fullTestDuration
+        << "msgs/s(" << double(rbytes)*TIME_SEC/fullTestDuration
+        << "bytes/s)\n";
+
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.cpp Sun Apr 27 
21:41:46 2008
@@ -0,0 +1,351 @@
+#include "RdmaIO.h"
+
+#include <iostream>
+#include <boost/bind.hpp>
+
+namespace Rdma {
+    AsynchIO::AsynchIO(
+            QueuePair::intrusive_ptr q,
+            int s,
+            ReadCallback rc,
+            IdleCallback ic,
+            ErrorCallback ec
+    ) :
+        qp(q),
+        dataHandle(*qp, boost::bind(&AsynchIO::dataEvent, this, _1), 0, 0),
+        bufferSize(s),
+        recvBufferCount(DEFAULT_WR_ENTRIES),
+        readCallback(rc),
+        idleCallback(ic),
+        errorCallback(ec)
+    {
+        qp->nonblocking();
+        qp->notifyRecv();
+        qp->notifySend();
+
+        // Prepost some recv buffers before we go any further
+        for (int i = 0; i<recvBufferCount; ++i) {
+            Buffer* b = qp->createBuffer(bufferSize);
+            buffers.push_front(b);
+            b->dataCount = b->byteCount;
+            qp->postRecv(b);
+        }
+    }
+
+    AsynchIO::~AsynchIO() {
+        // The buffers ptr_deque automatically deletes all the buffers we've 
allocated
+    }
+
+    void AsynchIO::start(Poller::shared_ptr poller) {
+        dataHandle.startWatch(poller);
+    }
+
+    void AsynchIO::queueReadBuffer(Buffer*) {
+    }
+
+    void AsynchIO::queueWrite(Buffer* buff) {
+        qp->postSend(buff);
+    }
+
+    void AsynchIO::notifyPendingWrite() {
+    }
+
+    void AsynchIO::queueWriteClose() {
+    }
+
+    Buffer* AsynchIO::getBuffer() {
+        if (bufferQueue.empty()) {
+            Buffer* b = qp->createBuffer(bufferSize);
+            buffers.push_front(b);
+            b->dataCount = 0;
+            return b;
+        } else {
+            Buffer* b = bufferQueue.front();
+            bufferQueue.pop_front();
+            b->dataCount = 0;
+            b->dataStart = 0;
+            return b;
+        }
+
+    }
+
+    void AsynchIO::dataEvent(DispatchHandle&) {
+        QueuePair::intrusive_ptr q = qp->getNextChannelEvent();
+
+        // If no event do nothing
+        if (!q)
+            return;
+
+        assert(q == qp);
+
+        // Re-enable notification for queue
+        qp->notifySend();
+        qp->notifyRecv();
+
+        // Repeat until no more events
+        do {
+            QueuePairEvent e(qp->getNextEvent());
+            if (!e)
+                return;
+
+            ::ibv_wc_status status = e.getEventStatus();
+            if (status != IBV_WC_SUCCESS) {
+                errorCallback(*this);
+                return;
+            }
+
+            // Test if recv (or recv with imm)
+            //::ibv_wc_opcode eventType = e.getEventType();
+            Buffer* b = e.getBuffer();
+            QueueDirection dir = e.getDirection();
+            if (dir == RECV) {
+                readCallback(*this, b);
+                // At this point the buffer has been consumed so put it back 
on the recv queue
+                qp->postRecv(b);
+            } else {
+                bufferQueue.push_front(b);
+                idleCallback(*this);
+            }
+        } while (true);
+    }
+
+    Listener::Listener(
+        const sockaddr& src,
+        ConnectedCallback cc,
+        ErrorCallback errc,
+        DisconnectedCallback dc,
+        ConnectionRequestCallback crc
+    ) :
+        src_addr(src),
+        ci(Connection::make()),
+        handle(*ci, boost::bind(&Listener::connectionEvent, this, _1), 0, 0),
+        connectedCallback(cc),
+        errorCallback(errc),
+        disconnectedCallback(dc),
+        connectionRequestCallback(crc),
+        state(IDLE)
+    {
+        ci->nonblocking();
+    }
+
+    void Listener::start(Poller::shared_ptr poller) {
+        ci->bind(src_addr);
+        ci->listen();
+        state = LISTENING;
+        handle.startWatch(poller);
+    }
+
+    void Listener::connectionEvent(DispatchHandle&) {
+        ConnectionEvent e(ci->getNextEvent());
+
+        // If (for whatever reason) there was no event do nothing
+        if (!e)
+            return;
+
+        // Important documentation ommision the new rdma_cm_id
+        // you get from CONNECT_REQUEST has the same context info
+        // as its parent listening rdma_cm_id
+        ::rdma_cm_event_type eventType = e.getEventType();
+        Rdma::Connection::intrusive_ptr id = e.getConnection();
+
+        switch (eventType) {
+        case RDMA_CM_EVENT_CONNECT_REQUEST: {
+            bool accept = true;
+            // Extract connection parameters and private data from event
+            ::rdma_conn_param conn_param = e.getConnectionParam();
+
+            if (connectionRequestCallback)
+                //TODO: pass private data to callback (and accept new private 
data for accept somehow)
+                accept = connectionRequestCallback(id);
+            if (accept) {
+                // Accept connection
+                id->accept(conn_param);
+            } else {
+                //Reject connection
+                id->reject();
+            }
+
+            break;
+        }
+        case RDMA_CM_EVENT_ESTABLISHED:
+            connectedCallback(id);
+            break;
+        case RDMA_CM_EVENT_DISCONNECTED:
+            disconnectedCallback(id);
+            break;
+        case RDMA_CM_EVENT_CONNECT_ERROR:
+            errorCallback(id);
+            break;
+        default:
+            std::cerr << "Warning: unexpected response to listen - " << 
eventType << "\n";
+        }
+    }
+
+    Connector::Connector(
+        const sockaddr& dst,
+        ConnectedCallback cc,
+        ErrorCallback errc,
+        DisconnectedCallback dc,
+        RejectedCallback rc
+    ) :
+        dst_addr(dst),
+        ci(Connection::make()),
+        handle(*ci, boost::bind(&Connector::connectionEvent, this, _1), 0, 0),
+        connectedCallback(cc),
+        errorCallback(errc),
+        disconnectedCallback(dc),
+        rejectedCallback(rc),
+        state(IDLE)
+    {
+        ci->nonblocking();
+    }
+
+    void Connector::start(Poller::shared_ptr poller) {
+        ci->resolve_addr(dst_addr);
+        state = RESOLVE_ADDR;
+        handle.startWatch(poller);
+    }
+
+    void Connector::connectionEvent(DispatchHandle&) {
+        ConnectionEvent e(ci->getNextEvent());
+
+        // If (for whatever reason) there was no event do nothing
+        if (!e)
+            return;
+
+        ::rdma_cm_event_type eventType = e.getEventType();
+#if 1
+        switch (eventType) {
+        case RDMA_CM_EVENT_ADDR_RESOLVED:
+            // RESOLVE_ADDR
+            state = RESOLVE_ROUTE;
+            ci->resolve_route();
+            break;
+        case RDMA_CM_EVENT_ADDR_ERROR:
+            // RESOLVE_ADDR
+            state = ERROR;
+            errorCallback(ci);
+            break;
+        case RDMA_CM_EVENT_ROUTE_RESOLVED:
+            // RESOLVE_ROUTE:
+            state = CONNECTING;
+            ci->connect();
+            break;
+        case RDMA_CM_EVENT_ROUTE_ERROR:
+            // RESOLVE_ROUTE:
+            state = ERROR;
+            errorCallback(ci);
+            break;
+        case RDMA_CM_EVENT_CONNECT_ERROR:
+            // CONNECTING
+            state = ERROR;
+            errorCallback(ci);
+            break;
+        case RDMA_CM_EVENT_UNREACHABLE:
+            // CONNECTING
+            state = ERROR;
+            errorCallback(ci);
+            break;
+        case RDMA_CM_EVENT_REJECTED:
+            // CONNECTING
+            state = REJECTED;
+            rejectedCallback(ci);
+            break;
+        case RDMA_CM_EVENT_ESTABLISHED:
+            // CONNECTING
+            state = ESTABLISHED;
+            connectedCallback(ci);
+            break;
+        case RDMA_CM_EVENT_DISCONNECTED:
+            // ESTABLISHED
+            state = DISCONNECTED;
+            disconnectedCallback(ci);
+            break;
+        default:
+            std::cerr << "Warning: unexpected event in " << state << " state - 
" << eventType << "\n";
+            state = ERROR;
+        }
+#else
+        switch (state) {
+        case IDLE:
+            std::cerr << "Warning: event in IDLE state\n";
+            break;
+        case RESOLVE_ADDR:
+            switch (eventType) {
+            case RDMA_CM_EVENT_ADDR_RESOLVED:
+                state = RESOLVE_ROUTE;
+                ci->resolve_route();
+                break;
+            case RDMA_CM_EVENT_ADDR_ERROR:
+                state = ERROR;
+                errorCallback(ci);
+                break;
+            default:
+                state = ERROR;
+                std::cerr << "Warning: unexpected response to resolve_addr - " 
<< eventType << "\n";
+            }
+            break;
+        case RESOLVE_ROUTE:
+            switch (eventType) {
+            case RDMA_CM_EVENT_ROUTE_RESOLVED:
+                state = CONNECTING;
+                ci->connect();
+                break;
+            case RDMA_CM_EVENT_ROUTE_ERROR:
+                state = ERROR;
+                errorCallback(ci);
+                break;
+            default:
+                state = ERROR;
+                std::cerr << "Warning: unexpected response to resolve_route - 
" << eventType << "\n";
+            }
+            break;
+        case CONNECTING:
+            switch (eventType) {
+            case RDMA_CM_EVENT_CONNECT_RESPONSE:
+                std::cerr << "connect_response\n";
+                break;
+            case RDMA_CM_EVENT_CONNECT_ERROR:
+                state = ERROR;
+                errorCallback(ci);
+                break;
+            case RDMA_CM_EVENT_UNREACHABLE:
+                state = ERROR;
+                errorCallback(ci);
+                break;
+            case RDMA_CM_EVENT_REJECTED:
+                state = REJECTED;
+                rejectedCallback(ci);
+                break;
+            case RDMA_CM_EVENT_ESTABLISHED:
+                state = ESTABLISHED;
+                connectedCallback(ci);
+                break;
+            default:
+                state = ERROR;
+                std::cerr << "Warning: unexpected response to connect - " << 
eventType << "\n";
+            }
+            break;
+        case ESTABLISHED:
+            switch (eventType) {
+            case RDMA_CM_EVENT_DISCONNECTED:
+                disconnectedCallback(ci);
+                break;
+            default:
+                std::cerr << "Warning: unexpected event in ESTABLISHED state - 
" << eventType << "\n";
+            }
+            break;
+        case REJECTED:
+            std::cerr << "Warning: event in REJECTED state - " << eventType << 
"\n";
+            break;
+        case ERROR:
+            std::cerr << "Warning: event in ERROR state - " << eventType << 
"\n";
+            break;
+        case LISTENING:
+        case ACCEPTING:
+            std::cerr << "Warning: in an illegal state (and received event!) - 
" << eventType << "\n";
+            break;
+        }
+#endif
+    }
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaIO.h Sun Apr 27 
21:41:46 2008
@@ -0,0 +1,128 @@
+#ifndef Rdma_Acceptor_h
+#define Rdma_Acceptor_h
+
+#include "rdma_wrap.h"
+
+#include "qpid/sys/Dispatcher.h"
+
+#include <netinet/in.h>
+
+#include <boost/function.hpp>
+#include <boost/ptr_container/ptr_deque.hpp>
+#include <deque>
+
+using qpid::sys::DispatchHandle;
+using qpid::sys::Poller;
+
+namespace Rdma {
+
+    class Connection;
+    enum ConnectionState {
+        IDLE,
+        RESOLVE_ADDR,
+        RESOLVE_ROUTE,
+        LISTENING,
+        CONNECTING,
+        ACCEPTING,
+        ESTABLISHED,
+        REJECTED,
+        DISCONNECTED,
+        ERROR
+    };
+
+    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> 
ConnectedCallback;
+    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> 
ErrorCallback;
+    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> 
DisconnectedCallback;
+    typedef boost::function1<bool, Rdma::Connection::intrusive_ptr&> 
ConnectionRequestCallback;
+    typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> 
RejectedCallback;
+
+    class AsynchIO
+    {
+        typedef boost::function1<void, AsynchIO&> ErrorCallback;
+        typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
+        typedef boost::function1<void, AsynchIO&>  IdleCallback;
+
+        QueuePair::intrusive_ptr qp;
+        DispatchHandle dataHandle;
+        int bufferSize;
+        int recvBufferCount;
+        std::deque<Buffer*> bufferQueue;
+        boost::ptr_deque<Buffer> buffers;
+
+        ReadCallback readCallback;
+        IdleCallback idleCallback;
+        ErrorCallback errorCallback;
+
+    public:
+        AsynchIO(
+            QueuePair::intrusive_ptr q,
+            int s,
+            ReadCallback rc,
+            IdleCallback ic,
+            ErrorCallback ec
+        );
+        ~AsynchIO();
+
+        void start(Poller::shared_ptr poller);
+        void queueReadBuffer(Buffer* buff);
+        void queueWrite(Buffer* buff);
+        void notifyPendingWrite();
+        void queueWriteClose();
+        Buffer* getBuffer();
+
+    private:
+        void dataEvent(DispatchHandle& handle);
+    };
+
+    class Listener
+    {
+        sockaddr src_addr;
+        Connection::intrusive_ptr ci;
+        DispatchHandle handle;
+        ConnectedCallback connectedCallback;
+        ErrorCallback errorCallback;
+        DisconnectedCallback disconnectedCallback;
+        ConnectionRequestCallback connectionRequestCallback;
+        ConnectionState state;
+
+    public:
+        Listener(
+            const sockaddr& src,
+            ConnectedCallback cc,
+            ErrorCallback errc,
+            DisconnectedCallback dc,
+            ConnectionRequestCallback crc = 0
+        );
+        void start(Poller::shared_ptr poller);
+
+    private:
+        void connectionEvent(DispatchHandle& handle);
+    };
+
+    class Connector
+    {
+        sockaddr dst_addr;
+        Connection::intrusive_ptr ci;
+        DispatchHandle handle;
+        ConnectedCallback connectedCallback;
+        ErrorCallback errorCallback;
+        DisconnectedCallback disconnectedCallback;
+        RejectedCallback rejectedCallback;
+        ConnectionState state;
+
+    public:
+        Connector(
+            const sockaddr& dst,
+            ConnectedCallback cc,
+            ErrorCallback errc,
+            DisconnectedCallback dc,
+            RejectedCallback rc = 0
+        );
+        void start(Poller::shared_ptr poller);
+
+    private:
+        void connectionEvent(DispatchHandle& handle);
+    };
+}
+
+#endif // Rdma_Acceptor_h

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/RdmaServer.cpp Sun Apr 27 
21:41:46 2008
@@ -0,0 +1,142 @@
+#include "RdmaIO.h"
+
+#include <arpa/inet.h>
+
+#include <vector>
+#include <queue>
+#include <string>
+#include <iostream>
+
+#include <boost/bind.hpp>
+
+using std::vector;
+using std::queue;
+using std::string;
+using std::cout;
+using std::cerr;
+
+using qpid::sys::Poller;
+using qpid::sys::Dispatcher;
+
+// All the accepted connections
+struct ConRec {
+    Rdma::Connection::intrusive_ptr connection;
+    Rdma::AsynchIO* data;
+    int outstandingWrites;
+    queue<Rdma::Buffer*> queuedWrites;
+
+    ConRec(Rdma::Connection::intrusive_ptr c) :
+        connection(c),
+        outstandingWrites(0)
+    {}
+};
+
+void dataError(Rdma::AsynchIO&) {
+    cout << "Data error:\n";
+}
+
+void data(ConRec* cr, Rdma::AsynchIO& a, Rdma::Buffer* b) {
+    // Echo data back
+    Rdma::Buffer* buf = a.getBuffer();
+    std::copy(b->bytes+b->dataStart, b->bytes+b->dataStart+b->dataCount, 
buf->bytes);
+    buf->dataCount = b->dataCount;
+    if (cr->outstandingWrites < 3*Rdma::DEFAULT_WR_ENTRIES/4) {
+        a.queueWrite(buf);
+        ++(cr->outstandingWrites);
+    } else {
+        cr->queuedWrites.push(buf);
+    }
+}
+
+void idle(ConRec* cr, Rdma::AsynchIO& a) {
+    --(cr->outstandingWrites);
+    //if (cr->outstandingWrites < Rdma::DEFAULT_WR_ENTRIES/4)
+        while (!cr->queuedWrites.empty() && cr->outstandingWrites < 
3*Rdma::DEFAULT_WR_ENTRIES/4) {
+            Rdma::Buffer* buf = cr->queuedWrites.front();
+            cr->queuedWrites.pop();
+            a.queueWrite(buf);
+            ++(cr->outstandingWrites);
+        }
+}
+
+void disconnected(Rdma::Connection::intrusive_ptr& ci) {
+    ConRec* cr = ci->getContext<ConRec>();
+    cr->connection->disconnect();
+    delete cr->data;
+    delete cr;
+    cout << "Disconnected: " << cr << "\n";
+}
+
+void connectionError(Rdma::Connection::intrusive_ptr& ci) {
+    ConRec* cr = ci->getContext<ConRec>();
+    cr->connection->disconnect();
+    if (cr) {
+        delete cr->data;
+        delete cr;
+    }
+    cout << "Connection error: " << cr << "\n";
+}
+
+bool connectionRequest(Rdma::Connection::intrusive_ptr& ci) {
+    cout << "Incoming connection: ";
+
+    // For fun reject alternate connection attempts
+    static bool x = false;
+    x ^= 1;
+
+    // Must create aio here so as to prepost buffers *before* we accept 
connection
+    if (x) {
+        ConRec* cr = new ConRec(ci);
+        Rdma::AsynchIO* aio =
+            new Rdma::AsynchIO(ci->getQueuePair(), 8000,
+                boost::bind(data, cr, _1, _2),
+                boost::bind(idle, cr, _1),
+                dataError);
+        ci->addContext(cr);
+        cr->data = aio;
+        cout << "Accept=>" << cr << "\n";
+    } else {
+        cout << "Reject\n";
+    }
+
+    return x;
+}
+
+void connected(Poller::shared_ptr poller, Rdma::Connection::intrusive_ptr& ci) 
{
+    static int cnt = 0;
+    ConRec* cr = ci->getContext<ConRec>();
+    cout << "Connected: " << cr << "(" << ++cnt << ")\n";
+
+    cr->data->start(poller);
+}
+
+int main(int argc, char* argv[]) {
+    vector<string> args(&argv[0], &argv[argc]);
+
+    ::sockaddr_in sin;
+
+    int port = (args.size() < 2) ? 20079 : atoi(args[1].c_str());
+    cout << "Listening on port: " << port << "\n";
+
+    sin.sin_family      = AF_INET;
+    sin.sin_port        = htons(port);
+    sin.sin_addr.s_addr = INADDR_ANY;
+
+    try {
+        boost::shared_ptr<Poller> p(new Poller());
+        Dispatcher d(p);
+
+        Rdma::Listener a((const sockaddr&)(sin),
+            boost::bind(connected, p, _1),
+            connectionError,
+            disconnected,
+            connectionRequest);
+
+
+        a.start(p);
+        d.run();
+    } catch (Rdma::Exception& e) {
+        int err = e.getError();
+        cerr << "Error: " << e.what() << "(" << err << ")\n";
+    }
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_exception.h Sun Apr 27 
21:41:46 2008
@@ -0,0 +1,45 @@
+#ifndef RDMA_EXCEPTION_H
+#define RDMA_EXCEPTION_H
+
+#include <exception>
+
+#include <errno.h>
+#include <string.h>
+
+namespace Rdma {
+    static __thread char s[50];
+    class Exception : public std::exception {
+        int err;
+
+    public:
+        Exception(int e) : err(e) {}
+        int getError() { return err; }
+        const char* what() const throw() {
+            return ::strerror_r(err, s, 50);
+        }
+    };
+
+    inline void THROW_ERRNO() {
+        throw Rdma::Exception(errno);
+    }
+
+    inline void CHECK(int rc) {
+        if (rc != 0)
+            throw Rdma::Exception((rc == -1) ? errno : rc >0 ? rc : -rc);
+    }
+
+    inline void CHECK_IBV(int rc) {
+        if (rc != 0)
+            throw Rdma::Exception(rc);
+    }
+
+    template <typename T>
+    inline
+    T* CHECK_NULL(T* rc) {
+        if (rc == 0)
+            THROW_ERRNO();
+        return rc;
+    }
+}
+
+#endif // RDMA_EXCEPTION_H

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.cpp Sun Apr 
27 21:41:46 2008
@@ -0,0 +1,39 @@
+#include "rdma_factories.h"
+
+namespace Rdma {
+    void acker(::rdma_cm_event* e) throw () {
+        if (e)
+            // Intentionally ignore return value - we can't do anything about 
it here
+            (void) ::rdma_ack_cm_event(e);
+    }
+
+    void destroyEChannel(::rdma_event_channel* c) throw () {
+        if (c)
+            // Intentionally ignore return value - we can't do anything about 
it here
+            (void) ::rdma_destroy_event_channel(c);
+    }
+
+    void destroyId(::rdma_cm_id* i) throw () {
+        if (i)
+            // Intentionally ignore return value - we can't do anything about 
it here
+            (void) ::rdma_destroy_id(i);
+    }
+
+    void deallocPd(::ibv_pd* p) throw () {
+        if (p)
+            // Intentionally ignore return value - we can't do anything about 
it here
+            (void) ::ibv_dealloc_pd(p);
+    }
+
+    void destroyCChannel(::ibv_comp_channel* c) throw () {
+        if (c)
+            // Intentionally ignore return value - we can't do anything about 
it here
+            (void) ::ibv_destroy_comp_channel(c);
+    }
+
+    void destroyCq(::ibv_cq* cq) throw () {
+        if (cq)
+            (void) ::ibv_destroy_cq(cq);
+    }
+
+}

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_factories.h Sun Apr 27 
21:41:46 2008
@@ -0,0 +1,48 @@
+#ifndef RDMA_FACTORIES_H
+#define RDMA_FACTORIES_H
+
+#include "rdma_exception.h"
+
+#include <rdma/rdma_cma.h>
+
+#include <boost/shared_ptr.hpp>
+
+namespace Rdma {
+    // These allow us to use simple shared_ptrs to do ref counting
+    void acker(::rdma_cm_event* e) throw ();
+    void destroyEChannel(::rdma_event_channel* c) throw ();
+    void destroyId(::rdma_cm_id* i) throw ();
+    void deallocPd(::ibv_pd* p) throw ();
+    void destroyCChannel(::ibv_comp_channel* c) throw ();
+    void destroyCq(::ibv_cq* cq) throw ();
+
+    inline boost::shared_ptr< ::rdma_event_channel > mkEChannel() {
+        return
+            boost::shared_ptr< ::rdma_event_channel 
>(::rdma_create_event_channel(), destroyEChannel);
+    }
+
+    inline boost::shared_ptr< ::rdma_cm_id >
+    mkId(::rdma_event_channel* ec, void* context, ::rdma_port_space ps) {
+        ::rdma_cm_id* i;
+        CHECK(::rdma_create_id(ec, &i, context, ps));
+        return boost::shared_ptr< ::rdma_cm_id >(i, destroyId);
+    }
+
+    inline boost::shared_ptr< ::ibv_pd > allocPd(::ibv_context* c) {
+        ::ibv_pd* pd = CHECK_NULL(ibv_alloc_pd(c));
+        return boost::shared_ptr< ::ibv_pd >(pd, deallocPd);
+    }
+
+    inline boost::shared_ptr< ::ibv_comp_channel > mkCChannel(::ibv_context* 
c) {
+        ::ibv_comp_channel* cc = CHECK_NULL(::ibv_create_comp_channel(c));
+        return boost::shared_ptr< ::ibv_comp_channel >(cc, destroyCChannel);
+    }
+
+    inline boost::shared_ptr< ::ibv_cq >
+    mkCq(::ibv_context* c, int cqe, void* context, ::ibv_comp_channel* cc) {
+        ::ibv_cq* cq = CHECK_NULL(ibv_create_cq(c, cqe, context, cc, 0));
+        return boost::shared_ptr< ::ibv_cq >(cq, destroyCq);
+    }
+}
+
+#endif // RDMA_FACTORIES_H

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h?rev=652053&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/rdma/rdma_wrap.h Sun Apr 27 
21:41:46 2008
@@ -0,0 +1,550 @@
+#ifndef RDMA_WRAP_H
+#define RDMA_WRAP_H
+
+#include "rdma_factories.h"
+
+#include <rdma/rdma_cma.h>
+
+#include "qpid/RefCounted.h"
+#include "qpid/sys/IOHandle.h"
+#include "qpid/sys/posix/PrivatePosix.h"
+
+#include <fcntl.h>
+
+#include <vector>
+#include <algorithm>
+#include <iostream>
+#include <stdexcept>
+#include <boost/shared_ptr.hpp>
+#include <boost/intrusive_ptr.hpp>
+
+namespace Rdma {
+    const int DEFAULT_TIMEOUT = 2000; // 2 secs
+    const int DEFAULT_BACKLOG = 100;
+    const int DEFAULT_CQ_ENTRIES = 256;
+    const int DEFAULT_WR_ENTRIES = 64;
+    const ::rdma_conn_param DEFAULT_CONNECT_PARAM = {
+        0,    // .private_data
+        0,    // .private_data_len
+        4,    // .responder_resources
+        4,    // .initiator_depth
+        0,    // .flow_control
+        5,    // .retry_count
+        7     // .rnr_retry_count
+    };
+
+    struct Buffer {
+        friend class QueuePair;
+
+        char* const bytes;
+        const int32_t byteCount;
+        int32_t dataStart;
+        int32_t dataCount;
+
+        Buffer(::ibv_pd* pd, char* const b, const int32_t s) :
+            bytes(b),
+            byteCount(s),
+            dataStart(0),
+            dataCount(0),
+            mr(CHECK_NULL(::ibv_reg_mr(
+                pd, bytes, byteCount,
+                ::IBV_ACCESS_LOCAL_WRITE)))
+        {}
+
+        ~Buffer() {
+            (void) ::ibv_dereg_mr(mr);
+            delete [] bytes;
+        }
+
+    private:
+        ::ibv_mr* mr;
+    };
+
+    class Connection;
+
+    enum QueueDirection {
+        NONE,
+        SEND,
+        RECV
+    };
+
+    class QueuePairEvent {
+        boost::shared_ptr< ::ibv_cq > cq;
+        ::ibv_wc wc;
+        QueueDirection dir;
+
+        friend class QueuePair;
+
+        QueuePairEvent() :
+            dir(NONE)
+        {}
+
+        QueuePairEvent(
+            const ::ibv_wc& w,
+            boost::shared_ptr< ::ibv_cq > c,
+            QueueDirection d) :
+            cq(c),
+            wc(w),
+            dir(d)
+        {
+            assert(dir != NONE);
+        }
+
+    public:
+        operator bool() const {
+            return dir != NONE;
+        }
+
+        QueueDirection getDirection() const {
+            return dir;
+        }
+
+        ::ibv_wc_opcode getEventType() const {
+            return wc.opcode;
+        }
+
+        ::ibv_wc_status getEventStatus() const {
+            return wc.status;
+        }
+
+        Buffer* getBuffer() const {
+            Buffer* b = reinterpret_cast<Buffer*>(wc.wr_id);
+            b->dataCount = wc.byte_len;
+            return b;
+        }
+    };
+
+    // Wrapper for a queue pair - this has the functionality for
+    // putting buffers on the receive queue and for sending buffers
+    // to the other end of the connection.
+    //
+    // Currently QueuePairs are contained inside Connections and have no
+    // separate lifetime
+    class QueuePair : public qpid::sys::IOHandle, public qpid::RefCounted {
+        boost::shared_ptr< ::ibv_pd > pd;
+        boost::shared_ptr< ::ibv_comp_channel > cchannel;
+        boost::shared_ptr< ::ibv_cq > scq;
+        boost::shared_ptr< ::ibv_cq > rcq;
+        boost::shared_ptr< ::rdma_cm_id > id;
+        int outstandingSendEvents;
+        int outstandingRecvEvents;
+
+        friend class Connection;
+
+        QueuePair(boost::shared_ptr< ::rdma_cm_id > id);
+        ~QueuePair();
+
+    public:
+        typedef boost::intrusive_ptr<QueuePair> intrusive_ptr;
+
+        // Create a buffer to use for writing
+        Buffer* createBuffer(int s) {
+            return new Buffer(pd.get(), new char[s], s);
+        }
+
+        // Make channel non-blocking by making
+        // associated fd nonblocking
+        void nonblocking() {
+            ::fcntl(cchannel->fd, F_SETFL, O_NONBLOCK);
+        }
+
+        // If we get EAGAIN because the channel has been set non blocking
+        // and we'd have to wait then return an empty event
+        QueuePair::intrusive_ptr getNextChannelEvent() {
+            // First find out which cq has the event
+            ::ibv_cq* cq;
+            void* ctx;
+            int rc = ::ibv_get_cq_event(cchannel.get(), &cq, &ctx);
+            if (rc == -1 && errno == EAGAIN)
+                return 0;
+            CHECK(rc);
+
+            // Batch acknowledge the event
+            if (cq == scq.get()) {
+                if (++outstandingSendEvents > DEFAULT_CQ_ENTRIES / 2) {
+                    ::ibv_ack_cq_events(cq, outstandingSendEvents);
+                    outstandingSendEvents = 0;
+                }
+            } else if (cq == rcq.get()) {
+                if (++outstandingRecvEvents > DEFAULT_CQ_ENTRIES / 2) {
+                    ::ibv_ack_cq_events(cq, outstandingRecvEvents);
+                    outstandingRecvEvents = 0;
+                }
+            }
+
+            return static_cast<QueuePair*>(ctx);
+        }
+
+        QueuePairEvent getNextEvent() {
+            ::ibv_wc w;
+            if (::ibv_poll_cq(scq.get(), 1, &w) == 1)
+                return QueuePairEvent(w, scq, SEND);
+            else if (::ibv_poll_cq(rcq.get(), 1, &w) == 1)
+                return QueuePairEvent(w, rcq, RECV);
+            else
+                return QueuePairEvent();
+        }
+
+        void postRecv(Buffer* buf);
+        void postSend(Buffer* buf);
+        void notifyRecv();
+        void notifySend();
+    };
+
+    class ConnectionEvent {
+        friend class Connection;
+
+        // The order of the members is important as we have to acknowledge
+        // the event before destroying the ids on destruction
+        boost::intrusive_ptr<Connection> id;
+        boost::intrusive_ptr<Connection> listen_id;
+        boost::shared_ptr< ::rdma_cm_event > event;
+
+        ConnectionEvent() {}
+        ConnectionEvent(::rdma_cm_event* e);
+
+        // Default copy, assignment and destructor ok
+    public:
+        operator bool() const {
+            return event;
+        }
+
+        ::rdma_cm_event_type getEventType() const {
+            return event->event;
+        }
+
+        ::rdma_conn_param getConnectionParam() const {
+            if (event->event == RDMA_CM_EVENT_CONNECT_REQUEST) {
+                return event->param.conn;
+            } else {
+                ::rdma_conn_param p = {};
+                return p;
+            }
+        }
+
+        boost::intrusive_ptr<Connection> getConnection () const {
+            return id;
+        }
+
+        boost::intrusive_ptr<Connection> getListenId() const {
+            return listen_id;
+        }
+    };
+
+    // For the moment this is a fairly simple wrapper for rdma_cm_id.
+    //
+    // NB: It allocates a protection domain (pd) per connection which means 
that
+    // registered buffers can't be shared between different connections
+    // (this can only happen between connections on the same controller in any 
case,
+    // so needs careful management if used)
+    class Connection : public qpid::sys::IOHandle, public qpid::RefCounted {
+        boost::shared_ptr< ::rdma_event_channel > channel;
+        boost::shared_ptr< ::rdma_cm_id > id;
+        QueuePair::intrusive_ptr qp;
+
+        void* context;
+
+        friend class ConnectionEvent;
+        friend class QueuePair;
+
+        // Wrap the passed in rdma_cm_id with a Connection
+        // this basically happens only on connection request
+        Connection(::rdma_cm_id* i) :
+            qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+            id(i, destroyId),
+            context(0)
+        {
+            impl->fd = id->channel->fd;
+
+            // Just overwrite the previous context as it will
+            // have come from the listening connection
+            if (i)
+                i->context = this;
+        }
+
+        Connection() :
+            qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+            channel(mkEChannel()),
+            id(mkId(channel.get(), this, RDMA_PS_TCP)),
+            context(0)
+        {
+            impl->fd = channel->fd;
+       }
+
+        // Default destructor fine
+
+        void ensureQueuePair() {
+            assert(id.get());
+
+            // Only allocate a queue pair if there isn't one already
+            if (qp)
+                return;
+
+            qp = new QueuePair(id);
+        }
+
+    public:
+        typedef boost::intrusive_ptr<Connection> intrusive_ptr;
+
+        static intrusive_ptr make() {
+            return new Connection();
+        }
+
+        static intrusive_ptr find(::rdma_cm_id* i) {
+            if (!i)
+                return 0;
+            Connection* id = static_cast< Connection* >(i->context);
+            if (!id)
+                throw std::logic_error("Couldn't find existing Connection");
+            return id;
+        }
+
+        template <typename T>
+        void addContext(T* c) {
+            // Don't allow replacing context
+            if (!context)
+                context = c;
+        }
+
+        template <typename T>
+        T* getContext() {
+            return static_cast<T*>(context);
+        }
+
+        // Make channel non-blocking by making
+        // associated fd nonblocking
+        void nonblocking() {
+            assert(id.get());
+            ::fcntl(id->channel->fd, F_SETFL, O_NONBLOCK);
+        }
+
+        // If we get EAGAIN because the channel has been set non blocking
+        // and we'd have to wait then return an empty event
+        ConnectionEvent getNextEvent() {
+            assert(id.get());
+            ::rdma_cm_event* e;
+            int rc = ::rdma_get_cm_event(id->channel, &e);
+            if (rc == -1 && errno == EAGAIN)
+                return ConnectionEvent();
+            CHECK(rc);
+            return ConnectionEvent(e);
+        }
+
+        void bind(sockaddr& src_addr) const {
+            assert(id.get());
+            CHECK(::rdma_bind_addr(id.get(), &src_addr));
+        }
+
+        void listen(int backlog = DEFAULT_BACKLOG) const {
+            assert(id.get());
+            CHECK(::rdma_listen(id.get(), backlog));
+        }
+
+        void resolve_addr(
+            sockaddr& dst_addr,
+            sockaddr* src_addr = 0,
+            int timeout_ms = DEFAULT_TIMEOUT) const
+        {
+            assert(id.get());
+            CHECK(::rdma_resolve_addr(id.get(), src_addr, &dst_addr, 
timeout_ms));
+        }
+
+        void resolve_route(int timeout_ms = DEFAULT_TIMEOUT) const {
+            assert(id.get());
+            CHECK(::rdma_resolve_route(id.get(), timeout_ms));
+        }
+
+        void disconnect() const {
+            assert(id.get());
+            CHECK(::rdma_disconnect(id.get()));
+        }
+
+        // TODO: Currently you can only connect with the default connection 
parameters
+        void connect() {
+            assert(id.get());
+
+            // Need to have a queue pair before we can connect
+            ensureQueuePair();
+
+            ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+            CHECK(::rdma_connect(id.get(), &p));
+        }
+
+        template <typename T>
+        void connect(const T* data) {
+            assert(id.get());
+            // Need to have a queue pair before we can connect
+            ensureQueuePair();
+
+            ::rdma_conn_param p = DEFAULT_CONNECT_PARAM;
+            p.private_data = data;
+            p.private_data_len = sizeof(T);
+            CHECK(::rdma_connect(id.get(), &p));
+        }
+
+        // TODO: Not sure how to default accept params - they come from the 
connection request
+        // event
+        template <typename T>
+        void accept(const ::rdma_conn_param& param, const T* data) {
+            assert(id.get());
+            // Need to have a queue pair before we can accept
+            ensureQueuePair();
+
+            ::rdma_conn_param p = param;
+            p.private_data = data;
+            p.private_data_len = sizeof(T);
+            CHECK(::rdma_accept(id.get(), &p));
+        }
+
+        void accept(const ::rdma_conn_param& param) {
+            assert(id.get());
+            // Need to have a queue pair before we can accept
+            ensureQueuePair();
+
+            ::rdma_conn_param p = param;
+            p.private_data = 0;
+            p.private_data_len = 0;
+            CHECK(::rdma_accept(id.get(), &p));
+        }
+
+        template <typename T>
+        void reject(const T* data) const {
+            assert(id.get());
+            CHECK(::rdma_reject(id.get(), data, sizeof(T)));
+        }
+
+        void reject() const {
+            assert(id.get());
+            CHECK(::rdma_reject(id.get(), 0, 0));
+        }
+
+        QueuePair::intrusive_ptr getQueuePair() {
+            assert(id.get());
+
+            ensureQueuePair();
+
+            return qp;
+        }
+    };
+
+    inline QueuePair::QueuePair(boost::shared_ptr< ::rdma_cm_id > i) :
+        qpid::sys::IOHandle(new qpid::sys::IOHandlePrivate),
+        pd(allocPd(i->verbs)),
+        cchannel(mkCChannel(i->verbs)),
+        scq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+        rcq(mkCq(i->verbs, DEFAULT_CQ_ENTRIES, 0, cchannel.get())),
+        id(i),
+        outstandingSendEvents(0),
+        outstandingRecvEvents(0)
+    {
+        impl->fd = cchannel->fd;
+
+        // Set cq context to this QueuePair object so we can find
+        // ourselves again
+        scq->cq_context = this;
+        rcq->cq_context = this;
+
+        ::ibv_qp_init_attr qp_attr = {};
+
+        // TODO: make a default struct for this
+        qp_attr.cap.max_send_wr  = DEFAULT_WR_ENTRIES;
+        qp_attr.cap.max_send_sge = 4;
+        qp_attr.cap.max_recv_wr  = DEFAULT_WR_ENTRIES;
+        qp_attr.cap.max_recv_sge = 4;
+
+        qp_attr.send_cq      = scq.get();
+        qp_attr.recv_cq      = rcq.get();
+        qp_attr.qp_type      = IBV_QPT_RC;
+
+        CHECK(::rdma_create_qp(id.get(), pd.get(), &qp_attr));
+
+        // Set the qp context to this so we can find ourselves again
+        id->qp->qp_context = this;
+    }
+
+    inline QueuePair::~QueuePair() {
+        if (outstandingSendEvents > 0)
+            ::ibv_ack_cq_events(scq.get(), outstandingSendEvents);
+        if (outstandingRecvEvents > 0)
+            ::ibv_ack_cq_events(rcq.get(), outstandingRecvEvents);
+
+        ::rdma_destroy_qp(id.get());
+    }
+
+    inline void QueuePair::notifyRecv() {
+        CHECK_IBV(ibv_req_notify_cq(rcq.get(), 0));
+    }
+
+    inline void QueuePair::notifySend() {
+        CHECK_IBV(ibv_req_notify_cq(scq.get(), 0));
+    }
+
+    inline void QueuePair::postRecv(Buffer* buf) {
+        ::ibv_recv_wr rwr = {};
+        ::ibv_sge sge;
+
+        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+        sge.length = buf->dataCount;
+        sge.lkey = buf->mr->lkey;
+
+        rwr.wr_id = reinterpret_cast<uint64_t>(buf);
+        rwr.sg_list = &sge;
+        rwr.num_sge = 1;
+
+        ::ibv_recv_wr* badrwr = 0;
+        CHECK_IBV(::ibv_post_recv(id->qp, &rwr, &badrwr));
+        if (badrwr)
+            throw std::logic_error("ibv_post_recv(): Bad rwr");
+    }
+
+    inline void QueuePair::postSend(Buffer* buf) {
+        ::ibv_send_wr swr = {};
+        ::ibv_sge sge;
+
+        sge.addr = (uintptr_t) buf->bytes+buf->dataStart;
+        sge.length = buf->dataCount;
+        sge.lkey = buf->mr->lkey;
+
+        swr.wr_id = reinterpret_cast<uint64_t>(buf);
+        swr.opcode = IBV_WR_SEND;
+        swr.send_flags = IBV_SEND_SIGNALED;
+        swr.sg_list = &sge;
+        swr.num_sge = 1;
+
+        ::ibv_send_wr* badswr = 0;
+        CHECK_IBV(::ibv_post_send(id->qp, &swr, &badswr));
+        if (badswr)
+            throw std::logic_error("ibv_post_send(): Bad swr");
+    }
+
+    inline ConnectionEvent::ConnectionEvent(::rdma_cm_event* e) :
+        id((e->event != RDMA_CM_EVENT_CONNECT_REQUEST) ?
+                Connection::find(e->id) : new Connection(e->id)),
+        listen_id(Connection::find(e->listen_id)),
+        event(e, acker)
+    {}
+}
+
+inline std::ostream& operator<<(std::ostream& o, ::rdma_cm_event_type t) {
+#   define CHECK_TYPE(t) case t: o << #t; break;
+    switch(t) {
+        CHECK_TYPE(RDMA_CM_EVENT_ADDR_RESOLVED)
+        CHECK_TYPE(RDMA_CM_EVENT_ADDR_ERROR)
+        CHECK_TYPE(RDMA_CM_EVENT_ROUTE_RESOLVED)
+        CHECK_TYPE(RDMA_CM_EVENT_ROUTE_ERROR)
+        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_REQUEST)
+        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_RESPONSE)
+        CHECK_TYPE(RDMA_CM_EVENT_CONNECT_ERROR)
+        CHECK_TYPE(RDMA_CM_EVENT_UNREACHABLE)
+        CHECK_TYPE(RDMA_CM_EVENT_REJECTED)
+        CHECK_TYPE(RDMA_CM_EVENT_ESTABLISHED)
+        CHECK_TYPE(RDMA_CM_EVENT_DISCONNECTED)
+        CHECK_TYPE(RDMA_CM_EVENT_DEVICE_REMOVAL)
+        CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_JOIN)
+        CHECK_TYPE(RDMA_CM_EVENT_MULTICAST_ERROR)
+    }
+#   undef CHECK_TYPE
+    return o;
+}
+
+#endif // RDMA_WRAP_H


Reply via email to