All,

Attached is a user-mode program, called rping, that uses librdmacm and
libibverbs to implement a ping-pong program over an RC connection.  The
program utilizes SEND, RECV, RDMA READ, and WRITE ops, as well as cq
channels to get cq events, and rdma_get_event() to detect CMA events.
It is multi-threaded.  

I've built it as an example program in librdmacm/examples and tested it
with mthca.  It is useful to test CMA as well as all the major rdma
operations in a transport-neutral way.

If you all find it has utility, please pull it into librdmacm/examples.


Signed-off-by: Steve Wise <[EMAIL PROTECTED]>



Index: Makefile.am
===================================================================
--- Makefile.am (revision 5330)
+++ Makefile.am (working copy)
@@ -18,9 +18,11 @@
 src_librdmacm_la_SOURCES = src/cma.c
 src_librdmacm_la_LDFLAGS = -avoid-version $(rdmacm_version_script)
 
-bin_PROGRAMS = examples/ucmatose
+bin_PROGRAMS = examples/ucmatose examples/rping
 examples_ucmatose_SOURCES = examples/cmatose.c
 examples_ucmatose_LDADD = $(top_builddir)/src/librdmacm.la
+examples_rping_SOURCES = examples/rping.c
+examples_rping_LDADD = $(top_builddir)/src/librdmacm.la
 
 librdmacmincludedir = $(includedir)/rdma
 
Index: examples/rping.c
===================================================================
--- examples/rping.c    (revision 0)
+++ examples/rping.c    (revision 0)
@@ -0,0 +1,1175 @@
+/*
+ * Copyright (c) 2005 Ammasso, Inc. All rights reserved.
+ * Copyright (c) 2006 Open Grid Computing, Inc. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses.  You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ *     Redistribution and use in source and binary forms, with or
+ *     without modification, are permitted provided that the following
+ *     conditions are met:
+ *
+ *      - Redistributions of source code must retain the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer.
+ *
+ *      - Redistributions in binary form must reproduce the above
+ *        copyright notice, this list of conditions and the following
+ *        disclaimer in the documentation and/or other materials
+ *        provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <getopt.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <byteswap.h>
+#include <semaphore.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+
+#include <rdma/rdma_cma.h>
+
+static int debug = 0;
+#define DEBUG_LOG if (debug) printf
+
+/*
+ * rping "ping/pong" loop:
+ *     client sends source rkey/addr/len
+ *     server receives source rkey/add/len
+ *     server rdma reads "ping" data from source
+ *     server sends "go ahead" on rdma read completion
+ *     client sends sink rkey/addr/len
+ *     server receives sink rkey/addr/len
+ *     server rdma writes "pong" data to sink
+ *     server sends "go ahead" on rdma write completion
+ *     <repeat loop>
+ */
+
+/*
+ * These states are used to signal events between the completion handler
+ * and the main client or server thread.
+ *
+ * Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV, 
+ * and RDMA_WRITE_COMPLETE for each ping.
+ */
+typedef enum {
+       IDLE = 1,
+       CONNECT_REQUEST,
+       CONNECTED,
+       RDMA_READ_ADV,
+       RDMA_READ_COMPLETE,
+       RDMA_WRITE_ADV,
+       RDMA_WRITE_COMPLETE,
+       ERROR
+} state_t;
+
+/*
+ * Default max buffer size for IO...
+ */
+#define RPING_BUFSIZE 64*1024
+#define RPING_SQ_DEPTH 16
+
+/*
+ * Control block struct.
+ */
+struct rping_cb {
+       int server;                     /* 0 iff client */
+       pthread_t cqthread;
+       struct ibv_comp_channel *channel;
+       struct ibv_cq *cq;
+       struct ibv_pd *pd;
+       struct ibv_qp *qp;
+
+       struct ibv_recv_wr rq_wr;       /* recv work request record */
+       struct ibv_sge recv_sgl;        /* recv single SGE */
+       char *recv_buf;                 /* malloc'd buffer */
+       struct ibv_mr *recv_mr;         /* MR associated with this buffer */
+
+       struct ibv_send_wr sq_wr;       /* send work requrest record */
+       struct ibv_sge send_sgl;
+       char *send_buf;                 /* single send buf */
+       struct ibv_mr *send_mr;
+
+       struct ibv_send_wr rdma_sq_wr;  /* rdma work request record */
+       struct ibv_sge rdma_sgl;        /* rdma single SGE */
+       char *rdma_buf;                 /* used as rdma sink */
+       struct ibv_mr *rdma_mr;
+
+
+       uint32_t remote_rkey;           /* remote guys RKEY */
+       uint64_t remote_addr;           /* remote guys TO */
+       uint32_t remote_len;            /* remote guys LEN */
+
+       char *start_buf;                /* rdma read src */
+       struct ibv_mr *start_mr;
+
+       state_t state;                  /* used for cond/signalling */
+       sem_t sem;
+
+       uint16_t port;                  /* dst port in NBO */
+       uint32_t addr;                  /* dst addr in NBO */
+       char *addr_str;                 /* dst addr string */
+       int verbose;                    /* verbose logging */
+       int count;                      /* ping count */
+       int size;                       /* ping data size */
+       int validate;                   /* validate ping data */
+
+       /* CM stuff */
+       pthread_t cmthread;
+       struct rdma_cm_id *cm_id;       /* connection on client side,*/
+                                       /* listener on service side. */
+       struct rdma_cm_id *child_cm_id; /* connection on server side */
+};
+
+
+static void rping_cma_event_handler(struct rdma_cm_id *cma_id,
+                                   struct rdma_cm_event *event)
+{
+       int rc = 0;
+       struct rping_cb *cbp = (struct rping_cb *) cma_id->context;
+
+
+       DEBUG_LOG("cma_event type %d cma_id %p (%s)\n",
+                 event->event, cma_id,
+                 (cma_id == cbp->cm_id) ? "parent" : "child");
+       switch (event->event) {
+
+       case RDMA_CM_EVENT_ADDR_RESOLVED:
+               rc = rdma_resolve_route(cma_id, 2000);
+               if (rc) {
+                       fprintf(stderr, "rdma_resolve_route error %d\n", rc);
+                       cbp->state = ERROR;
+                       sem_post(&cbp->sem);
+               }
+               break;
+
+       case RDMA_CM_EVENT_ROUTE_RESOLVED:
+               sem_post(&cbp->sem);
+               break;
+
+       case RDMA_CM_EVENT_CONNECT_REQUEST:
+               cbp->state = CONNECT_REQUEST;
+               cbp->child_cm_id = cma_id;
+               DEBUG_LOG("child cma %p\n", cbp->child_cm_id);
+               sem_post(&cbp->sem);
+               break;
+
+       case RDMA_CM_EVENT_ESTABLISHED:
+               DEBUG_LOG("ESTABLISHED\n");
+               cbp->state = CONNECTED;
+               sem_post(&cbp->sem);
+               break;
+
+       case RDMA_CM_EVENT_ADDR_ERROR:
+       case RDMA_CM_EVENT_ROUTE_ERROR:
+       case RDMA_CM_EVENT_CONNECT_ERROR:
+       case RDMA_CM_EVENT_UNREACHABLE:
+       case RDMA_CM_EVENT_REJECTED:
+               fprintf(stderr, "cma event %d, error %d\n", event->event,
+                      event->status);
+               cbp->state = ERROR;
+               sem_post(&cbp->sem);
+               break;
+
+       case RDMA_CM_EVENT_DISCONNECTED:
+               fprintf(stderr, "DISCONNECT EVENT...\n");
+               cbp->state = ERROR;
+               sem_post(&cbp->sem);
+               break;
+
+       case RDMA_CM_EVENT_DEVICE_REMOVAL:
+               fprintf(stderr, "cma detected device removal!!!!\n");
+               break;
+
+       default:
+               fprintf(stderr, "oof bad type!\n");
+               cbp->state = ERROR;
+               sem_post(&cbp->sem);
+               break;
+
+       }
+       return;
+}
+
+static void rping_cq_event_handler(struct rping_cb *cbp)
+{
+       struct ibv_wc wc;
+       struct ibv_recv_wr *bad_wr;
+       int rc;
+
+       while ((rc = ibv_poll_cq(cbp->cq, 1, &wc)) == 1) {
+               if (wc.status) {
+                       fprintf(stderr, "cq completion failed status %d\n",
+                              wc.status);
+                       cbp->state = ERROR;
+                       sem_post(&cbp->sem);
+                       return;
+               }
+               switch (wc.opcode) {
+               case IBV_WC_SEND:
+                       DEBUG_LOG("send completion\n");
+                       break;
+
+               case IBV_WC_RDMA_WRITE:
+                       DEBUG_LOG("rdma write completion\n");
+                       cbp->state = RDMA_WRITE_COMPLETE;
+                       sem_post(&cbp->sem);
+                       break;
+
+               case IBV_WC_RDMA_READ:
+                       cbp->state = RDMA_READ_COMPLETE;
+                       DEBUG_LOG("rdma read completion\n");
+                       sem_post(&cbp->sem);
+                       break;
+
+               case IBV_WC_RECV:
+                       DEBUG_LOG("recv completion\n");
+                       if (cbp->server) {
+                               if (wc.byte_len != 16) {
+                                       fprintf(stderr,
+                                              "Received bogus data, size %d\n",
+                                              wc.byte_len);
+                                       cbp->state = ERROR;
+                                       sem_post(&cbp->sem);
+                                       return;
+                               }
+                               cbp->remote_rkey = *((uint32_t *)cbp->recv_buf);
+                               cbp->remote_addr =
+                                   *((uint64_t *) & cbp->recv_buf[4]);
+                               cbp->remote_len =
+                                   *((uint32_t *) & cbp->recv_buf[12]);
+                               DEBUG_LOG(
+                                         "Received rkey %x addr %llx "
+                                         "len %d from peer\n",
+                                         cbp->remote_rkey, cbp->remote_addr,
+                                         cbp->remote_len);
+                               if (cbp->state == CONNECTED
+                                   || cbp->state == RDMA_WRITE_COMPLETE)
+                                       cbp->state = RDMA_READ_ADV;
+                               else {
+                                       cbp->state = RDMA_WRITE_ADV;
+                               }
+                       } else {
+                               if (wc.byte_len != 1) {
+                                       fprintf(stderr,
+                                              "Received bogus data, size %d\n",
+                                              wc.byte_len);
+                                       cbp->state = ERROR;
+                                       sem_post(&cbp->sem);
+                                       return;
+                               }
+                               if (cbp->state == RDMA_READ_ADV) {
+                                       cbp->state = RDMA_WRITE_ADV;
+                                       DEBUG_LOG("set state to WRITE_ADV\n");
+                               } else {
+                                       cbp->state = RDMA_WRITE_COMPLETE;
+                                       DEBUG_LOG("set state "
+                                                 "to WRITE_COMPLETE\n");
+                               }
+                       }
+
+                       /*
+                        * post recv buf again
+                        */
+                       rc = ibv_post_recv(cbp->qp, &cbp->rq_wr, &bad_wr);
+                       if (rc) {
+                               cbp->state = ERROR;
+                       }
+                       sem_post(&cbp->sem);
+                       break;
+
+               default:
+                       DEBUG_LOG("unknown!!!!! completion\n");
+                       break;
+               }
+       }
+       if (rc) {
+               fprintf(stderr, "poll error %d\n", rc);
+               exit(rc);
+       }
+}
+
+static int rping_accept_cr(struct rping_cb *cbp, char *priv, int len)
+{
+       struct rdma_conn_param conn_param;
+
+       DEBUG_LOG("accept_cr!\n");
+       memset(&conn_param, 0, sizeof conn_param);
+       conn_param.private_data = priv;
+       conn_param.private_data_len = len;
+       conn_param.responder_resources = 1;
+       conn_param.initiator_depth = 1;
+       return rdma_accept(cbp->child_cm_id, &conn_param);
+}
+
+static int rping_connect(struct rping_cb *cbp, char *priv, int len)
+{
+       int rc;
+       struct rdma_conn_param conn_param;
+
+       memset(&conn_param, 0, sizeof conn_param);
+       conn_param.private_data = priv;
+       conn_param.private_data_len = len;
+       conn_param.responder_resources = 1;
+       conn_param.initiator_depth = 1;
+       conn_param.retry_count = 10;
+       rc = rdma_connect(cbp->cm_id, &conn_param);
+       if (rc) {
+               fprintf(stderr, "rdma_connect error %d\n", rc);
+               cbp->state = ERROR;
+               sem_post(&cbp->sem);
+       }
+       return 0;
+}
+
+static void rping_free_buffers(struct rping_cb *cbp)
+{
+       DEBUG_LOG("rping_free_buffers called on cbp %p\n", cbp);
+       ibv_dereg_mr(cbp->recv_mr);
+       free(cbp->recv_buf);
+       ibv_dereg_mr(cbp->send_mr);
+       free(cbp->send_buf);
+       ibv_dereg_mr(cbp->rdma_mr);
+       free(cbp->rdma_buf);
+       if (!cbp->server) {
+               ibv_dereg_mr(cbp->start_mr);
+               free(cbp->start_buf);
+       }
+}
+
+static int rping_setup_buffers(struct rping_cb *cbp)
+{
+       struct ibv_recv_wr *bad_wr;
+       int rc;
+
+       DEBUG_LOG("rping_setup_buffers called on cbp %p\n", cbp);
+       cbp->recv_buf = malloc(RPING_BUFSIZE);
+       if (cbp->recv_buf == NULL) {
+               return ENOMEM;
+       }
+
+       cbp->recv_mr = ibv_reg_mr(cbp->pd, cbp->recv_buf, RPING_BUFSIZE,
+                                 IBV_ACCESS_LOCAL_WRITE);
+       if (!(cbp->recv_mr)) {
+               free(cbp->recv_buf);
+               cbp->recv_buf = NULL;
+               return errno;
+       }
+
+       /* 
+        * these never change
+        */
+       cbp->recv_sgl.addr = (uint64_t) (unsigned long) cbp->recv_buf;
+       cbp->recv_sgl.length = RPING_BUFSIZE;
+       cbp->recv_sgl.lkey = cbp->recv_mr->lkey;
+
+       /* 
+        * these never change
+        */
+       cbp->rq_wr.wr_id = (uint64_t) (unsigned long) &cbp->rq_wr;
+       cbp->rq_wr.sg_list = &cbp->recv_sgl;
+       cbp->rq_wr.num_sge = 1;
+
+       cbp->send_buf = malloc(RPING_BUFSIZE);
+       if (cbp->send_buf == NULL) {
+               ibv_dereg_mr(cbp->recv_mr);
+               free(cbp->recv_buf);
+               cbp->recv_buf = NULL;
+               return ENOMEM;
+       }
+
+       cbp->send_mr = ibv_reg_mr(cbp->pd, cbp->send_buf, RPING_BUFSIZE, 0);
+       if (!(cbp->send_mr)) {
+               ibv_dereg_mr(cbp->recv_mr);
+               free(cbp->recv_buf);
+               free(cbp->send_buf);
+               cbp->recv_buf = NULL;
+               return errno;
+       }
+
+       /* 
+        * these never change
+        */
+       cbp->send_sgl.addr = (uint64_t) (unsigned long) cbp->send_buf;
+       cbp->send_sgl.lkey = cbp->send_mr->lkey;
+
+       /* 
+        * these never change
+        */
+       cbp->sq_wr.opcode = IBV_WR_SEND;
+       cbp->sq_wr.wr_id = (uint64_t) (unsigned long) &cbp->sq_wr;
+       cbp->sq_wr.num_sge = 1;
+       cbp->sq_wr.sg_list = &cbp->send_sgl;
+       cbp->sq_wr.send_flags = IBV_SEND_SIGNALED;
+
+       cbp->rdma_buf = malloc(RPING_BUFSIZE);
+       if (cbp->rdma_buf == NULL) {
+               ibv_dereg_mr(cbp->send_mr);
+               ibv_dereg_mr(cbp->recv_mr);
+               free(cbp->recv_buf);
+               free(cbp->send_buf);
+               cbp->recv_buf = NULL;
+               return ENOMEM;
+       }
+
+       cbp->rdma_mr = ibv_reg_mr(cbp->pd, cbp->rdma_buf, RPING_BUFSIZE,
+                          IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ |
+                          IBV_ACCESS_REMOTE_WRITE);
+       if (!(cbp->rdma_mr)) {
+               ibv_dereg_mr(cbp->send_mr);
+               ibv_dereg_mr(cbp->recv_mr);
+               free(cbp->recv_buf);
+               free(cbp->send_buf);
+               free(cbp->rdma_buf);
+               cbp->recv_buf = NULL;
+               return errno;
+       }
+
+       /* 
+        * these never change
+        */
+       cbp->rdma_sq_wr.wr_id = (uint64_t) (unsigned long) &cbp->rdma_sq_wr;
+       cbp->rdma_sq_wr.sg_list = &cbp->rdma_sgl;
+       cbp->rdma_sq_wr.num_sge = 1;
+       cbp->rdma_sgl.addr = (uint64_t) (unsigned long) cbp->rdma_buf;
+       cbp->rdma_sgl.lkey = cbp->rdma_mr->lkey;
+
+       if (!cbp->server) {
+               cbp->start_buf = malloc(RPING_BUFSIZE);
+               if (cbp->start_buf == NULL) {
+                       ibv_dereg_mr(cbp->send_mr);
+                       ibv_dereg_mr(cbp->recv_mr);
+                       ibv_dereg_mr(cbp->rdma_mr);
+                       free(cbp->send_buf);
+                       free(cbp->recv_buf);
+                       free(cbp->rdma_buf);
+                       cbp->recv_buf = NULL;
+                       return ENOMEM;
+               }
+
+               cbp->start_mr = ibv_reg_mr(cbp->pd, cbp->start_buf, 
+                                          RPING_BUFSIZE,
+                                          IBV_ACCESS_LOCAL_WRITE | 
+                                          IBV_ACCESS_REMOTE_READ |
+                                          IBV_ACCESS_REMOTE_WRITE);
+               if (!(cbp->start_mr)) {
+                       ibv_dereg_mr(cbp->send_mr);
+                       ibv_dereg_mr(cbp->recv_mr);
+                       ibv_dereg_mr(cbp->rdma_mr);
+                       free(cbp->send_buf);
+                       free(cbp->recv_buf);
+                       free(cbp->rdma_buf);
+                       free(cbp->start_buf);
+                       cbp->recv_buf = NULL;
+                       return errno;
+               }
+       }
+
+       rc = ibv_post_recv(cbp->qp, &cbp->rq_wr, &bad_wr);
+       if (rc) {
+               ibv_dereg_mr(cbp->send_mr);
+               ibv_dereg_mr(cbp->recv_mr);
+               ibv_dereg_mr(cbp->rdma_mr);
+               free(cbp->recv_buf);
+               free(cbp->send_buf);
+               free(cbp->rdma_buf);
+               if (!cbp->server) {
+                       free(cbp->start_buf);
+                       ibv_dereg_mr(cbp->start_mr);
+               }
+               cbp->recv_buf = NULL;
+               return rc;
+       }
+
+       DEBUG_LOG("allocated & registered buffers...\n");
+       return 0;
+}
+
+static int rping_create_qp(struct rping_cb *cbp)
+{
+       struct ibv_qp_init_attr init_attr;
+        struct ibv_qp_attr qp_attr;
+       int rc = 0;
+
+       memset(&init_attr, 0, sizeof(init_attr));
+       init_attr.cap.max_send_wr = RPING_SQ_DEPTH;
+       init_attr.cap.max_recv_wr = 2;
+       init_attr.cap.max_recv_sge = 1;
+       init_attr.cap.max_send_sge = 1;
+       init_attr.qp_type = IBV_QPT_RC;
+       init_attr.send_cq = cbp->cq;
+       init_attr.recv_cq = cbp->cq;
+       if (cbp->server) {
+               rc = rdma_create_qp(cbp->child_cm_id, cbp->pd, &init_attr);
+               cbp->qp = cbp->child_cm_id->qp;
+       } else {
+               rc = rdma_create_qp(cbp->cm_id, cbp->pd, &init_attr);
+               cbp->qp = cbp->cm_id->qp;
+       }
+       if (rc) {
+               cbp->qp = NULL;
+               return rc;
+       }
+
+       /* set REMOTE access rights on QP */
+        qp_attr.qp_access_flags = IBV_ACCESS_REMOTE_READ|
+                                 IBV_ACCESS_REMOTE_WRITE;
+        rc = ibv_modify_qp(cbp->qp, &qp_attr, IBV_QP_ACCESS_FLAGS);
+       if (rc) 
+               printf("ibv_modify_qp returned %d\n", rc);
+       return rc;
+}
+
+static void *cm_thread(void *arg)
+{
+       int rc;
+       struct rdma_cm_event *event;
+
+       while (1) {
+               rc = rdma_get_cm_event(&event);
+               if (rc) {
+                       fprintf(stderr, "rdma_get_cm_event err %d\n", rc);
+                       exit(rc);
+               }
+               rping_cma_event_handler(event->id, event);
+               rdma_ack_cm_event(event);
+       }
+}
+
+static void *cq_thread(void *arg)
+{
+       struct rping_cb *cbp = arg;
+       int rc;
+       
+       DEBUG_LOG("cq_thread started.\n");
+
+       while (1) {
+               struct ibv_cq *ev_cq;
+               void *ev_ctx;
+               
+               rc = ibv_get_cq_event(cbp->channel, &ev_cq, &ev_ctx);
+               if (rc) {
+                       fprintf(stderr, "Failed to get cq event!\n");
+                       exit(rc);
+               }
+               if (ev_cq != cbp->cq) {
+                       fprintf(stderr, "Unkown CQ!\n");
+                       exit(-1);
+               }
+               rc = ibv_req_notify_cq(cbp->cq, 0);
+               if (rc) {
+                       fprintf(stderr, "Failed to set notify!\n");
+                       exit(rc);
+               }
+               rping_cq_event_handler(cbp);
+               ibv_ack_cq_events(cbp->cq, 1);
+       }
+}
+
+static void do_rping(struct rping_cb *cbp)
+{
+       int ping = 0;
+       int start;
+       int cc;
+       unsigned char c;
+       int rc;
+
+       if (cbp->size == 0)
+               cbp->size = 64;
+
+       /* 
+        * Now doit!
+        */
+       if (cbp->server) {
+
+               /*
+                * Create listening endpoint.
+                */
+               rc = rdma_listen(cbp->cm_id, 3);
+               if (rc) {
+                       fprintf(stderr, "listen error %d\n", rc);
+                       goto out;
+               }
+               
+               /*
+                * Wait for a connection request.
+                */
+               rc = sem_wait(&cbp->sem);
+               if (rc || (cbp->state == ERROR)) {
+                       fprintf(stderr,
+                              "wait for CONNECT_REQUEST error %d state %d\n",
+                              rc, cbp->state);
+                       goto out;
+               }
+
+               cbp->pd = ibv_alloc_pd(cbp->child_cm_id->verbs);
+               if (!(cbp->pd)) {
+                       rc = errno;
+                       goto out;
+               }
+               DEBUG_LOG("created pd %p\n", cbp->pd);
+
+               cbp->channel = ibv_create_comp_channel(cbp->child_cm_id->verbs);
+               if (!cbp->channel) {
+                       rc = errno;
+                       goto out;
+               }
+               DEBUG_LOG("created channel %p\n", cbp->channel);
+
+               cbp->cq = ibv_create_cq(cbp->child_cm_id->verbs, 
+                                       RPING_SQ_DEPTH * 2, cbp, 
+                                       cbp->channel, 0);
+               if (!(cbp->cq)) {
+                       rc = errno;
+                       goto out;
+               }
+               DEBUG_LOG("created cq %p\n", cbp->cq);
+
+               rc = ibv_req_notify_cq(cbp->cq, 0);
+               if (rc) {
+                       fprintf(stderr, "Failed to set notify!\n");
+                       rc = errno;
+                       goto out;
+               }
+
+               pthread_create(&cbp->cqthread, NULL, cq_thread, cbp);
+
+               rc = rping_create_qp(cbp);
+               if (rc) {
+                       goto out;
+               }
+               DEBUG_LOG("created qp %p\n", cbp->qp);
+
+               /*
+                * Setup registered buffers.
+                */
+               rc = rping_setup_buffers(cbp);
+               if (rc) {
+                       goto out;
+               }
+
+               /*
+                * Accept the connection request.
+                */
+               rc = rping_accept_cr(cbp, "server", strlen("server") + 1);
+               if (rc) {
+                       fprintf(stderr, "accept error %d\n", rc);
+                       goto out;
+               }
+               rc = sem_wait(&cbp->sem);
+               if (rc || (cbp->state == ERROR)) {
+                       fprintf(stderr, "wait for CONNECTED "
+                              "state error %d state %d\n",
+                              rc, cbp->state);
+                       goto out;
+               }
+
+               /*
+                * Server side ping loop
+                */
+               while (1) {
+                       struct ibv_send_wr *bad_wr;
+
+                       /*
+                        * Wait for client's Start STAG/TO/Len
+                        */
+                       rc = sem_wait(&cbp->sem);
+                       if (rc || (cbp->state == ERROR)) {
+                               fprintf(stderr, "wait for RDMA_READ_ADV "
+                                      "state error %d state %d\n",
+                                      rc, cbp->state);
+                               goto out;
+                       }
+
+                       DEBUG_LOG("server received sink adv\n");
+
+                       /*
+                        * Issue RDMA Read.
+                        */
+                       cbp->rdma_sq_wr.opcode = IBV_WR_RDMA_READ;
+                       cbp->rdma_sq_wr.wr_id =
+                           (uint64_t) (unsigned long) &cbp->rdma_sq_wr;
+                       cbp->rdma_sq_wr.sg_list = &cbp->rdma_sgl;
+                       cbp->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
+                       cbp->rdma_sq_wr.wr.rdma.rkey = cbp->remote_rkey;
+                       cbp->rdma_sq_wr.wr.rdma.remote_addr = cbp->remote_addr;
+                       cbp->rdma_sq_wr.sg_list->length = cbp->remote_len;
+
+                       rc = ibv_post_send(cbp->qp, &cbp->rdma_sq_wr, &bad_wr);
+                       if (rc) {
+                               fprintf(stderr, "post send error %d\n", rc);
+                               goto out;
+                       }
+                       DEBUG_LOG("server posted rdma read req \n");
+
+                       /*
+                        * Wait for read completion
+                        */
+                       rc = sem_wait(&cbp->sem);
+                       if (rc || (cbp->state == ERROR)) {
+                               fprintf(stderr, "wait for "
+                                      "RDMA_READ_COMPLETE state error %d "
+                                      "state %d\n",
+                                      rc, cbp->state);
+                               goto out;
+                       }
+                       DEBUG_LOG("server received read complete\n");
+
+                       /*
+                        * Display data in recv buf
+                        */
+                       if (cbp->verbose) {
+                               printf("server ping data: %s\n",
+                                      cbp->rdma_buf);
+                       }
+
+                       /*
+                        * Tell client to continue
+                        */
+                       cbp->send_sgl.length = 1;
+                       rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+                       if (rc) {
+                               fprintf(stderr, "post send error %d\n", rc);
+                               goto out;
+                       }
+                       DEBUG_LOG("server posted go ahead\n");
+
+                       /*
+                        * Wait for client's RDMA STAG/TO/Len
+                        */
+                       rc = sem_wait(&cbp->sem);
+                       if (rc || (cbp->state == ERROR)) {
+                               fprintf(stderr, "wait for RDMA_WRITE_ADV "
+                                      "state error %d state %d\n",
+                                      rc, cbp->state);
+                               goto out;
+                       }
+                       DEBUG_LOG("server received sink adv\n");
+
+                       /*
+                        * RDMA Write echo data
+                        */
+                       cbp->rdma_sq_wr.opcode = IBV_WR_RDMA_WRITE;
+                       cbp->rdma_sq_wr.wr.rdma.rkey = cbp->remote_rkey;
+                       cbp->rdma_sq_wr.wr.rdma.remote_addr = cbp->remote_addr;
+                       cbp->rdma_sq_wr.send_flags = IBV_SEND_SIGNALED;
+                       cbp->rdma_sq_wr.sg_list->length =
+                           strlen(cbp->rdma_buf) + 1;
+                       DEBUG_LOG(
+                                 "rdma write from lkey %x laddr %llx len %d\n",
+                                 cbp->rdma_sq_wr.sg_list->lkey,
+                                 cbp->rdma_sq_wr.sg_list->addr,
+                                 cbp->rdma_sq_wr.sg_list->length);
+                       rc = ibv_post_send(cbp->qp, &cbp->rdma_sq_wr, &bad_wr);
+                       if (rc) {
+                               fprintf(stderr, "post send error %d\n", rc);
+                               goto out;
+                       }
+
+                       /*
+                        * Wait for completion
+                        */
+                       rc = sem_wait(&cbp->sem);
+                       if (rc || (cbp->state == ERROR)) {
+                               fprintf(stderr, "waiting for "
+                                      "RDMA_WRITE_COMPLETE state error %d\n",
+                                      rc);
+                               goto out;
+                       }
+                       DEBUG_LOG("server rdma write complete \n");
+
+                       /*
+                        * Tell client to begin again
+                        */
+                       cbp->send_sgl.length = 1;
+                       rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+                       if (rc) {
+                               fprintf(stderr, "post send error %d\n", rc);
+                               goto out;
+                       }
+                       DEBUG_LOG("server posted go ahead\n");
+               }
+       } else {
+               cbp->pd = ibv_alloc_pd(cbp->cm_id->verbs);
+               if (!(cbp->pd)) {
+                       rc = errno;
+                       goto out;
+               }
+               DEBUG_LOG("created pd %p\n", cbp->pd);
+
+               cbp->channel = ibv_create_comp_channel(cbp->cm_id->verbs);
+               if (!cbp->channel) {
+                       rc = errno;
+                       goto out;
+               }
+               DEBUG_LOG("created channel %p\n", cbp->channel);
+
+               cbp->cq = ibv_create_cq(cbp->cm_id->verbs, 
+                                       RPING_SQ_DEPTH * 2, cbp, cbp->channel, 
+                                       0);
+               if (!(cbp->cq)) {
+                       rc = errno;
+                       goto out;
+               }
+               DEBUG_LOG("created cq %p\n", cbp->cq);
+
+               rc = ibv_req_notify_cq(cbp->cq, 0);
+               if (rc) {
+                       fprintf(stderr, "Failed to set notify!\n");
+                       rc = errno;
+                       goto out;
+               }
+
+               pthread_create(&cbp->cqthread, NULL, cq_thread, cbp);
+
+               rc = rping_create_qp(cbp);
+               if (rc) {
+                       goto out;
+               }
+               DEBUG_LOG("created qp %p\n", cbp->qp);
+
+               /*
+                * Setup registered buffers.
+                */
+               rc = rping_setup_buffers(cbp);
+               if (rc)
+                       goto out;
+
+               /*
+                * Connect to server.
+                */
+               rc = rping_connect(cbp, "client", strlen("client") + 1);
+               if (rc) {
+                       fprintf(stderr, "connect error %d\n", rc);
+                       goto out;
+               }
+ 
+               rc = sem_wait(&cbp->sem);
+               if (rc || (cbp->state == ERROR)) {
+                       fprintf(stderr,
+                              "wait for CONNECTED error %d state %d\n", rc,
+                              cbp->state);
+                       goto out;
+               }
+
+               /*      
+                * Client side ping loop.
+                */
+               start = 65;
+               while (1) {
+                       int i;
+                       struct ibv_send_wr *bad_wr;
+
+                       cbp->state = RDMA_READ_ADV;
+
+                       ++ping;
+                       if (cbp->count && (ping > cbp->count)) {
+                               goto out;
+                       }
+
+                       /*
+                        * Put some ascii text in the buffer.
+                        */
+                       cc = sprintf(cbp->start_buf, "rdma-ping-%d: ", ping);
+                       for (i = cc, c = start; i < cbp->size; i++) {
+                               cbp->start_buf[i] = c;
+                               c++;
+                               if (c > 122)
+                                       c = 65;
+                       }
+                       start++;
+                       if (start > 122)
+                               start = 65;
+                       cbp->start_buf[cbp->size] = 0;
+
+                       /*
+                        * Send our start buffer rkey/addr/len...
+                        * The server will use this to RDMA READ the ping.
+                        */
+                       DEBUG_LOG("Sending Start rkey %x "
+                                 "addr %llx len %d for RDMA READ Source\n",
+                                 cbp->start_mr->rkey,
+                                 (uint64_t) (unsigned long) cbp->start_buf,
+                                 cbp->size + 1);
+                       cbp->send_sgl.length = 16;
+                       *((uint32_t *) (cbp->send_buf)) = cbp->start_mr->rkey;
+                       *((uint64_t *) (cbp->send_buf + 4)) =
+                           (uint64_t) (unsigned long) cbp->start_buf;
+                       *((uint32_t *) (cbp->send_buf + 12)) = cbp->size + 1;
+                       rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+                       if (rc) {
+                               fprintf(stderr, "post send error %d\n", rc);
+                               goto out;
+                       }
+
+                       /*
+                        * Wait for server to ACK
+                        */
+                       rc = sem_wait(&cbp->sem);
+                       if (rc || (cbp->state == ERROR)) {
+                               fprintf(stderr, "wait for RDMA_WRITE_ADV "
+                                      "state error %d state %d\n",
+                                      rc, cbp->state);
+                               goto out;
+                       }
+
+                       /*
+                        * Send our rdma buffer rkey/addr/len for receiving 
+                        * the ping echo from the server via RDMA_WRITE...
+                        */
+                       DEBUG_LOG("Sending rkey %x addr %llx "
+                                 "len %d for RDMA WRITE Sink\n",
+                                 cbp->rdma_mr->rkey,
+                                 (uint64_t) (unsigned long) cbp->rdma_buf,
+                                 cbp->size + 1);
+                       cbp->send_sgl.length = 16;
+                       *((uint32_t *) (cbp->send_buf)) = cbp->rdma_mr->rkey;
+                       *((uint64_t *) (cbp->send_buf + 4)) =
+                           (uint64_t) (unsigned long) cbp->rdma_buf;
+                       *((uint32_t *) (cbp->send_buf + 12)) = cbp->size + 1;
+                       rc = ibv_post_send(cbp->qp, &cbp->sq_wr, &bad_wr);
+                       if (rc) {
+                               fprintf(stderr, "post send error %d\n", rc);
+                               goto out;
+                       }
+
+                       /*
+                        * Wait for the server to say the RDMA Write is 
+                        * complete.
+                        */
+                       rc = sem_wait(&cbp->sem);
+                       if (rc || (cbp->state == ERROR)) {
+                               fprintf(stderr, "wait for "
+                                      "RDMA_WRITE_COMPLETE state error %d "
+                                      "state %d\n", rc, cbp->state);
+                               goto out;
+                       }
+
+                       if (cbp->validate) {
+
+                               /*
+                                * Validate data
+                                */
+                               if (memcmp (cbp->start_buf, cbp->rdma_buf,
+                                    cbp->size + 1)) {
+                                       fprintf(stderr, "data mismatch!\n");
+                                       goto out;
+                               }
+                       }
+
+                       /*
+                        * Display ping data.
+                        */
+                       if (cbp->verbose) {
+                               printf("ping data: %s\n", cbp->rdma_buf);
+                       } 
+               }
+       }
+out:
+       DEBUG_LOG("disconnecting\n");
+       rdma_disconnect(cbp->cm_id);
+
+       /* cleanup */
+       if (cbp->child_cm_id) {
+               DEBUG_LOG("destroying child cm_id %p\n", cbp->child_cm_id);
+               rdma_destroy_id(cbp->child_cm_id);
+       }
+       if (cbp->qp) {
+               DEBUG_LOG("destroying qp %p\n", cbp->qp);
+               ibv_destroy_qp(cbp->qp);
+       }
+       if (cbp->cq) {
+               DEBUG_LOG("destroying cq %p\n", cbp->cq);
+               ibv_destroy_cq(cbp->cq);
+       }
+       if (cbp->recv_buf) {
+               DEBUG_LOG("freeing bufs/mrs\n");
+               rping_free_buffers(cbp);
+       }
+       if (cbp->pd) {
+               DEBUG_LOG("dealloc pd %p\n", cbp->pd);
+               ibv_dealloc_pd(cbp->pd);
+       }
+       printf("close complete - returning from test \n");
+       return;
+}
+
+static void usage(char *name)
+{
+       printf("%s -c|s [-vVd] [-S size] [-C count] -a addr -p port\n", 
+              basename(name));
+       printf("\t-c\t\tclient side\n");
+       printf("\t-s\t\tserver side\n");
+       printf("\t-v\t\tdisplay ping data to stdout\n");
+       printf("\t-V\t\tverbosity\n");
+       printf("\t-d\t\tdebug printfs\n");
+       printf("\t-S size \tping data size\n");
+       printf("\t-C count\tping count times\n");
+       printf("\t-a addr\t\taddress\n");
+       printf("\t-p port\t\tport\n");
+}
+
+/*
+ * This function parses the command and executes the appropriate
+ * rping test.  It is assumed this entire function
+ * can execute on the calling thread and sleep if needed.
+ */
+int main(int argc, char *argv[])
+{
+       struct rping_cb *cbp;
+       int op;
+       int rc = 0;
+       struct sockaddr_in sin;
+
+       cbp = malloc(sizeof(*cbp));
+       if (cbp == NULL) {
+               return ENOMEM;
+       }
+       memset(cbp, 0, sizeof(*cbp));
+       cbp->server = -1;
+       cbp->state = IDLE;
+       sem_init(&cbp->sem, 0, 0);
+
+       opterr = 0;
+       while ((op=getopt(argc, argv, "a:p:C:S:t:scvVd")) != -1) {
+               switch (op) {
+               case 'a':
+                       cbp->addr_str = optarg;
+                       cbp->addr = inet_addr(optarg);
+                       DEBUG_LOG("ipaddr (%s)\n", optarg);
+                       break;
+               case 'p':
+                       cbp->port = htons(atoi(optarg));
+                       DEBUG_LOG("port %d\n", (int) atoi(optarg));
+                       break;
+               case 's':
+                       cbp->server = 1;
+                       DEBUG_LOG("server\n");
+                       break;
+               case 'c':
+                       cbp->server = 0;
+                       DEBUG_LOG("client\n");
+                       break;
+               case 'S':
+                       cbp->size = atoi(optarg) - 1;
+                       if ((cbp->size < 1)
+                           || (cbp->size > (RPING_BUFSIZE - 1))) {
+                               fprintf(stderr, "Invalid size %d "
+                                      "(valid range is 1 to %d)\n",
+                                      cbp->size, RPING_BUFSIZE);
+                               rc = EINVAL;
+                       } else
+                               DEBUG_LOG("size %d\n",
+                                         (int) atoi(optarg));
+                       break;
+               case 'C':
+                       cbp->count = atoi(optarg);
+                       if (cbp->count < 0) {
+                               fprintf(stderr, "Invalid count %d\n",
+                                      cbp->count);
+                               rc = EINVAL;
+                       } else
+                               DEBUG_LOG("count %d\n",
+                                         (int) cbp->count);
+                       break;
+               case 'v':
+                       cbp->verbose++;
+                       DEBUG_LOG("verbose\n");
+                       break;
+               case 'V':
+                       cbp->validate++;
+                       DEBUG_LOG("validate data\n");
+                       break;
+               case 'd':
+                       debug++;
+                       break;
+               default:
+                       usage("rping");
+                       rc = EINVAL;
+                       break;
+               }
+       }
+       if (rc)
+               goto out;
+
+       if (cbp->server == -1) {
+               fprintf(stderr, "must be either client or server\n");
+               rc = EINVAL;
+               goto out;
+       }
+
+       rc = rdma_create_id(&cbp->cm_id, cbp);
+       if (rc) {
+               rc = errno;
+               cbp->cm_id = NULL;
+               fprintf(stderr, "rdma_create_id error %d\n", rc);
+               goto out;
+       }
+       DEBUG_LOG("created cm_id %p\n", cbp->cm_id);
+
+       pthread_create(&cbp->cmthread, NULL, cm_thread, cbp);
+
+       /*
+        * Server binds to local addr/port to find the device.  Client resolves 
+        * the remote addr/port to find the device.
+        */
+       if (cbp->server) {
+               memset(&sin, 0, sizeof(sin));
+               sin.sin_family = AF_INET;
+               sin.sin_addr.s_addr = cbp->addr;
+               sin.sin_port = cbp->port;
+               rc = rdma_bind_addr(cbp->cm_id, (struct sockaddr *) &sin);
+               if (rc) {
+                       fprintf(stderr, "rdma_bind_addr error %d\n", rc);
+                       goto out;
+               }
+               DEBUG_LOG("rdma_bind_addr worked\n");
+       } else {
+               memset(&sin, 0, sizeof(sin));
+               sin.sin_family = AF_INET;
+               sin.sin_addr.s_addr = cbp->addr;
+               sin.sin_port = cbp->port;
+               rc = rdma_resolve_addr(cbp->cm_id, NULL,
+                                      (struct sockaddr *) &sin, 2000);
+               if (rc) {
+                       fprintf(stderr, "rdma_resolve_addr error %d\n", rc);
+                       goto out;
+               }
+
+               rc = sem_wait(&cbp->sem);
+               if (rc || cbp->state == ERROR) {
+                       fprintf(stderr, "waiting for address resolution "
+                              "error %d state %d\n", rc, cbp->state);
+                       goto out;
+               }
+               DEBUG_LOG("rdma_resolve_addr worked\n");
+       }
+
+       do_rping(cbp);
+
+out:
+       if (cbp->cm_id) {
+               DEBUG_LOG("destroy cm_id %p\n", cbp->cm_id);
+               rdma_destroy_id(cbp->cm_id);
+       }
+       free(cbp);
+       return rc;
+}

_______________________________________________
openib-general mailing list
[email protected]
http://openib.org/mailman/listinfo/openib-general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to