On Tue, Apr 13, 2010 at 08:48:42AM -0700, Sean Hefty wrote:
> >(2)  It takes a long time to finish rdma_disconnect work (about 10
> >seconds), so is it a reasonable time?
> 
> This is long.  How is your disconnect code structured?
>
Thanks for your reply.The project is mirror fs-metadata to another node in
HA-system, and IB is selected for its low latency.Since I'm a 100% newbie in IB 
development,
I think the problem may be caused by something very detail, just like problem 
(1).
so I paste all the connection-related function of my IB-transfer sample code(in 
kernel space),
Please have a look if you guys have time.

BTW: is there any docs about ib-core in kernel space? It's rather hard for me 
to understand
the code.

====
#include <linux/delay.h>
#include <linux/wait.h>
#include <linux/bio.h>
#include <rdma/ib_verbs.h>
#include <rdma/rdma_cm.h>

#define TEST_PAGES 16
#define RECV_PAGES 4096

#define JM_MAGIC  0xa0b0c0d0

struct connect_header {
        uint32_t        connect_magic;
        uint32_t        mirror_size; /* in 4k */
        uint64_t        addr;
        uint32_t        rkey;
};

struct jm_ib_device {
        struct list_head        conn_list;
        struct ib_device        *dev;
        struct ib_pd            *pd;
        struct ib_mr            *mr;
        int                     max_sge;
        unsigned int            max_wrs;
        spinlock_t              spinlock; /* protects all above */
};

struct jm_send_ctx {
        struct ib_sge           s_sge[64];
        u64                     s_offset;
        int                     s_size;
        int                     s_done;
        wait_queue_head_t       s_wait;
};

struct jm_rdma_conn {
        int                     jc_incoming;

        struct rdma_cm_id       *jc_id;

        struct ib_pd            *jc_pd;
        struct ib_mr            *jc_mr;

        struct ib_mr            *jc_map_mr;

        struct ib_cq            *jc_cq;
        struct ib_qp            *jc_qp;

        int                     jc_async_rc;
        struct completion       jc_done;

        wait_queue_head_t       jc_connect_wait;
        int                     jc_connstate;

        struct sockaddr_in      jc_remoteaddr;

        int                     jc_pb_nsegs;
        int                     jc_pb_segsize;
        u64                     jc_pb_addrs[64];
        u32                     jc_pb_rkey;

        struct page             *jc_pages[RECV_PAGES];
        unsigned long           jc_mappings[RECV_PAGES];
        int                     jc_page_count;

        struct list_head        list;
};

#define RDMA_RESOLVE_TIMEOUT    (5000)
#define RDMA_CONNECT_RETRY_MAX  (2)

#define JM_RDMA_MAX_DATA_SEGS   (16)

#define JM_RDMA_PORT            (18111) /* randomly chosen */

static int jm_disconnect(struct jm_rdma_conn *conn);
static void jm_conn_close(struct jm_rdma_conn *conn);
static int jm_handle_connect_req(struct rdma_cm_id *id,
                                 struct jm_rdma_conn **rconn, uint32_t 
mirror_size);
static struct rdma_cm_id *jm_listen_id = NULL;
static struct jm_ib_device *jm_ibdev = NULL;
static struct ib_client jm_ib_client = {
        .name   = "jm_ib",
        .add    = jm_ib_add_one,
        .remove = jm_ib_remove_one,
};

static void jm_cq_comp_handler(struct ib_cq *cq, void *context) {
        struct jm_rdma_conn *conn = context;
        struct ib_wc wc;
        struct jm_send_ctx *send;

        /* No idea why it should be called twice. */
        printk("cq comp for id %p\n", conn->jc_id);
        ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
        while (ib_poll_cq(cq, 1, &wc) == 1) {
                if (wc.opcode != IB_WC_RDMA_WRITE) {
                        printk("completed unknown opcode %d\n", wc.opcode);
                        /* continue; */
                }
                send = (struct jm_send_ctx *)wc.wr_id;
                printk("got send=%p\n", send);
                printk("completed RDMA_WRITE of IO(%Lu, %u)\n",
                       send->s_offset, send->s_size);
                send->s_done = wc.status == IB_WC_SUCCESS ? 1 : -EIO;
                wake_up_all(&send->s_wait);
        }
        ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
}

static void jm_cq_event_handler(struct ib_event *cause, void *context) {
        printk(KERN_ERR "got cq event %d\n", cause->event);
}

static void jm_qp_event_handler(struct ib_event *event, void *data) {
        switch (event->event) {
        case IB_EVENT_PATH_MIG:
        case IB_EVENT_COMM_EST:
        case IB_EVENT_SQ_DRAINED:
        case IB_EVENT_QP_LAST_WQE_REACHED:
                printk("got QP event %d received for QP=%p\n",
                       event->event, event->element.qp);
                break;
                /* These are considered fatal events */
        case IB_EVENT_PATH_MIG_ERR:
        case IB_EVENT_QP_FATAL:
        case IB_EVENT_QP_REQ_ERR:
        case IB_EVENT_QP_ACCESS_ERR:
        case IB_EVENT_DEVICE_FATAL:
        default:
                printk("got QP ERROR event %d for QP=%p\n",
                       event->event, event->element.qp);
                break;
        }
}

static int jm_setup_qp(struct jm_rdma_conn *conn, int outgoing) {
        struct ib_qp_init_attr iattr;
        int ret = 0;

        /* protection domain and memory region */
        conn->jc_mr = jm_ibdev->mr;
        conn->jc_pd = jm_ibdev->pd;

        /* create completion queue */
        conn->jc_cq = ib_create_cq(conn->jc_id->device,
                                   jm_cq_comp_handler,
                                   jm_cq_event_handler,
                                   conn, 16, 0);
        if (IS_ERR(conn->jc_cq)) {
                ret = PTR_ERR(conn->jc_cq);
                conn->jc_cq = NULL;
                printk("create cq failed: %d\n", ret);
                return ret;
        }
        if (outgoing)
                ret = ib_req_notify_cq(conn->jc_cq, IB_CQ_NEXT_COMP);
        else
                ret = ib_req_notify_cq(conn->jc_cq, IB_CQ_SOLICITED);
        if (ret) {
                printk("notify cq failed: %d\n", ret);
                goto out_destroy_cq;
        }

        /* create queue pair */
        memset(&iattr, 0, sizeof(iattr));
        if (outgoing) {
                iattr.cap.max_send_wr   = 16;
                iattr.cap.max_send_sge  = 16;
                iattr.cap.max_recv_wr   = 0;
                iattr.cap.max_recv_sge  = 0;
        } else {
                iattr.cap.max_send_wr   = 0;
                iattr.cap.max_send_sge  = 0;
                iattr.cap.max_recv_wr   = 16;
                iattr.cap.max_recv_sge  = 16;
        }
        iattr.send_cq           = conn->jc_cq;
        iattr.recv_cq           = conn->jc_cq;
        iattr.sq_sig_type       = IB_SIGNAL_REQ_WR;
        iattr.qp_type           = IB_QPT_RC;
        iattr.event_handler     = jm_qp_event_handler;
        iattr.qp_context        = conn;
        ret = rdma_create_qp(conn->jc_id, conn->jc_pd, &iattr);
        if (ret) {
                printk("create qp failed: %d\n", ret);
                goto out_destroy_cq;
        }
        conn->jc_qp = conn->jc_id->qp;
        printk("setup qp done\n");

        return 0;

out_destroy_cq:
        ib_destroy_cq(conn->jc_cq);
        conn->jc_cq = NULL;
        return ret;
}

static int jm_allocate_and_map_mr(struct jm_rdma_conn *conn,
                                        uint32_t mirror_size, struct 
connect_header *ch) {
        struct ib_phys_buf *ibp = NULL;
        struct page **buf_pages = NULL;
        u64 local_addr, addr;
        int i = 0, ret = -ENOMEM;

        buf_pages = kmalloc(sizeof(struct page *)*mirror_size, GFP_KERNEL);
        if (!buf_pages)
                goto out_free;
        ibp = kmalloc(sizeof(struct ib_phys_buf)*mirror_size, GFP_KERNEL);
        if (!ibp)
                goto out_free;
        for (i = 0; i < mirror_size; i++) {
                buf_pages[i] = alloc_page(GFP_KERNEL);
                if (!buf_pages[i])
                        goto out_free;
                addr = ib_dma_map_page(conn->jc_id->device, buf_pages[i],
                                       0, PAGE_SIZE, DMA_FROM_DEVICE);
                if (ib_dma_mapping_error(conn->jc_id->device, addr)) {
                        __free_page(buf_pages[i]);
                        goto out_free;
                }
                ibp[i].addr = addr;
                ibp[i].size = PAGE_SIZE;
        }
        /* call ib_dma_mapping_error to check for error */
        local_addr = ibp[0].addr;
        conn->jc_map_mr = ib_reg_phys_mr(conn->jc_pd, &ibp[0], mirror_size,
                                         IB_ACCESS_REMOTE_WRITE |
                                         IB_ACCESS_LOCAL_WRITE,
                                         &local_addr);
        if (IS_ERR(conn->jc_map_mr)) {
                ret = PTR_ERR(conn->jc_map_mr);
                conn->jc_map_mr = NULL;
                printk("get DMA mr failed: %d\n", ret);
                goto out_free;
        }
        conn->jc_pb_nsegs = 1;
        conn->jc_pb_segsize = PAGE_SIZE * mirror_size;
        conn->jc_pb_addrs[0] = local_addr;
        conn->jc_pb_rkey = conn->jc_map_mr->rkey;
        ch->addr = local_addr;
        ch->rkey = conn->jc_map_mr->rkey;
        for (i = 0; i < mirror_size; i++) {
                conn->jc_pages[i] = buf_pages[i];
                conn->jc_mappings[i] = ibp[i].addr;
                conn->jc_page_count++;
        }

        return 0;

out_free:
        printk("map mr failed at %d\n", i);
        for (i--; i >= 0; i--) {
                ib_dma_unmap_single(conn->jc_id->device, ibp[i].addr,
                                    PAGE_SIZE, DMA_FROM_DEVICE);
                __free_page(buf_pages[i]);
        }
        if (buf_pages)
                kfree(buf_pages);
        if (ibp)
                kfree(ibp);
        return ret;
}

static int jm_rdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event 
*event) {
        struct jm_rdma_conn *conn = id->context;
        struct ib_qp_init_attr iattr;
        struct ib_qp_attr attr;
        int connstate = 0, ret = 0;
        struct connect_header *ch = NULL;

        printk("event %d comes in for id %p\n", event->event, id);
        switch (event->event) {
        case RDMA_CM_EVENT_CONNECT_REQUEST:
                ch = (struct connect_header*)event->param.conn.private_data;
                if (!ch || (ch->connect_magic != JM_MAGIC)) {
                        printk("Connect request error.\n");
                        ret = -EINVAL;
                        break;
                }
                /* in this case, it is a newly allocated cm_id */
                printk("Connect request, event=%d, mirror_size=%d\n", 
event->event,
                        ch->mirror_size);
                ret = jm_handle_connect_req(id, &conn, ch->mirror_size);
                break;

        case RDMA_CM_EVENT_ADDR_RESOLVED:
        case RDMA_CM_EVENT_ROUTE_RESOLVED:
                conn->jc_async_rc = 0;
                complete(&conn->jc_done);
                break;

        case RDMA_CM_EVENT_ADDR_ERROR:
                conn->jc_async_rc = -EHOSTUNREACH;
                printk("CM address resolution error\n");
                complete(&conn->jc_done);
                break;
        case RDMA_CM_EVENT_ROUTE_ERROR:
                conn->jc_async_rc = -ENETUNREACH;
                printk("CM route resolution error\n");
                complete(&conn->jc_done);
                break;

        case RDMA_CM_EVENT_ESTABLISHED:
                connstate = 1;
                ib_query_qp(id->qp, &attr,
                            IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
                            &iattr);
                printk("%d responder resources (%d initiator)\n",
                       attr.max_dest_rd_atomic, attr.max_rd_atomic);
                jm_add_conn_to_list(conn);
                goto connected;

        case RDMA_CM_EVENT_CONNECT_ERROR:
                connstate = -ENOTCONN;
                goto connected;
        case RDMA_CM_EVENT_UNREACHABLE:
                connstate = -ENETDOWN;
                goto connected;
        case RDMA_CM_EVENT_REJECTED:
                connstate = -ECONNREFUSED;
                goto connected;
        case RDMA_CM_EVENT_DISCONNECTED:
                connstate = -ECONNABORTED;
                goto connected;

        case RDMA_CM_EVENT_DEVICE_REMOVAL:
                printk("Device removal id=%p\n", id);
                connstate = -ENODEV;
connected:
                printk("%pI4:%u (event 0x%x)\n",
                       &conn->jc_remoteaddr.sin_addr.s_addr,
                       ntohs(conn->jc_remoteaddr.sin_port),
                       event->event << 11);
                conn->jc_connstate = connstate;
                wake_up_all(&conn->jc_connect_wait);
                break;

        default:
                printk("unexpected CM event %d on id %p\n", event->event, id);
                break;
        }

        return ret;
}

static int jm_conn_open(struct jm_rdma_conn *conn, struct sockaddr *addr) {
        struct rdma_cm_id *id;
        int ret = 0;

        init_completion(&conn->jc_done);

        /* create interface device */
        id = rdma_create_id(jm_rdma_cm_event_handler, conn, RDMA_PS_TCP);
        if (IS_ERR(id)) {
                ret = PTR_ERR(id);
                printk("create RDMA id failed: %d\n", ret);
                goto out;
        }
        conn->jc_id = id;

        conn->jc_async_rc = -ETIMEDOUT;
        ret = rdma_resolve_addr(id, NULL, addr, RDMA_RESOLVE_TIMEOUT);
        if (ret) {
                printk("RDMA resolve addr failed: %d\n", ret);
                goto out_destroy_id;
        }
        wait_for_completion_interruptible_timeout(&conn->jc_done,
                                msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
        ret = conn->jc_async_rc;
        if (ret)
                goto out_destroy_id;
        conn->jc_remoteaddr = *(struct sockaddr_in *)addr;

        conn->jc_async_rc = -ETIMEDOUT;
        ret = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
        if (ret) {
                printk("RDMA resolve route failed: %d\n", ret);
                goto out_destroy_id;
        }
        wait_for_completion_interruptible_timeout(&conn->jc_done,
                                  msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1);
        ret = conn->jc_async_rc;
        if (ret)
                goto out_destroy_id;
        printk("open conn ok\n");
        return 0;

out_destroy_id:
        rdma_destroy_id(conn->jc_id);
        conn->jc_id = NULL;
out:
        return ret;
}

static void jm_conn_close(struct jm_rdma_conn *conn) {
        if (conn->jc_qp) {
                rdma_destroy_qp(conn->jc_id);
                conn->jc_qp = NULL;
        }
        if (conn->jc_map_mr) {
                int i = conn->jc_page_count - 1;

                for (; i >= 0; i--) {
                        ib_dma_unmap_single(conn->jc_id->device,
                                            conn->jc_mappings[i],
                                            PAGE_SIZE, DMA_FROM_DEVICE);
                        conn->jc_mappings[i] = 0;
                        __free_page(conn->jc_pages[i]);
                        conn->jc_pages[i] = NULL;
                        conn->jc_page_count--;
                }
                ib_dereg_mr(conn->jc_map_mr);
                conn->jc_map_mr = NULL;
        }
        if (conn->jc_cq) {
                ib_destroy_cq(conn->jc_cq);
                conn->jc_cq = NULL;
        }
        if (conn->jc_id) {
                rdma_destroy_id(conn->jc_id);
                conn->jc_id = NULL;
        }
}

static int jm_connect(struct jm_rdma_conn *conn) {
        struct rdma_conn_param conn_param;
        struct connect_header header;
        int ret;

        ret = jm_setup_qp(conn, 1);
        if (ret)
                goto out;

        /* connect server */
        init_waitqueue_head(&conn->jc_connect_wait);
        conn->jc_connstate = 0;

        memset(&conn_param, 0, sizeof(conn_param));
        memset(&header, 0, sizeof(header));
        header.connect_magic = JM_MAGIC;
        header.mirror_size = RECV_PAGES;
        conn_param.private_data = &header;
        conn_param.private_data_len = sizeof(header);
        conn_param.initiator_depth = 1;
        conn_param.retry_count = 7;
        ret = rdma_connect(conn->jc_id, &conn_param);
        if (ret) {
                printk("RDMA connect failed: %d\n", ret);
                goto out;
        }
        wait_event_interruptible(conn->jc_connect_wait,
                                 conn->jc_connstate != 0);
        if (conn->jc_connstate < 0)
                ret = conn->jc_connstate;
out:
        return ret;
}

static int jm_disconnect(struct jm_rdma_conn *conn) {
        struct ib_wc wc;
        int flush_count = 0;
        int ret;

        if (conn->jc_connstate <= 0)
                return 0;

        while (ib_poll_cq(conn->jc_cq, 1, &wc) == 1)
                ++flush_count;
        printk("id(%p) is to disconnect, %d events flushed\n",
               conn->jc_id, flush_count);
        ret = rdma_disconnect(conn->jc_id);
        if (ret) {
                printk("unable to perform disconnect: %d\n", ret);
                conn->jc_connstate = ret;
        } else {
                wait_event_interruptible(conn->jc_connect_wait,
                                         conn->jc_connstate != 1);
                printk("id(%p) after disconnect, connstate is %d\n",
                       conn->jc_id, conn->jc_connstate);
        }

        return ret;
}

/* receiver side */
static int jm_handle_connect_req(struct rdma_cm_id *id,
                                 struct jm_rdma_conn **rconn, uint32_t 
mirror_size) {
        struct rdma_conn_param conn_param;
        struct jm_rdma_conn *conn;
        int ret = 0, destroy = 1;
        struct connect_header ch;

        memset(&ch, 0, sizeof(ch));
        if (mirror_size > RECV_PAGES) {
                ret = -EINVAL;
                goto out_reject;
        }
        conn = kzalloc(sizeof(*conn), GFP_KERNEL);
        if (!conn) {
                ret = -ENOMEM;
                goto out_reject;
        }

        init_waitqueue_head(&conn->jc_connect_wait);
        conn->jc_connstate = 0;

        conn->jc_id = id;
        id->context = conn;
        ret = jm_setup_qp(conn, 0);
        if (ret) {
                kfree(conn);
                conn = NULL;
                goto out_reject; 
        }

        memset(&conn_param, 0, sizeof(conn_param));
        /* XXX tune these? */
        conn_param.responder_resources = 1;
        conn_param.initiator_depth = 1;
        conn_param.retry_count = 7;
        conn_param.rnr_retry_count = 7;

        ch.connect_magic = JM_MAGIC;
        ch.mirror_size = mirror_size;
        conn_param.private_data = &ch;
        conn_param.private_data_len = sizeof(ch);
        printk("allocating memory and map it for receiver\n");
        if ((ret = jm_allocate_and_map_mr(conn, mirror_size, &ch))) {
                printk("failed to allocate and map mr: %d\n", ret);
                goto out_reject;
        }
        /* rdma_accept() calls rdma_reject() internally if it fails */
        ret = rdma_accept(id, &conn_param);
        if (!ret)
                conn->jc_connstate = 1;

        conn->jc_incoming = 1;
        *rconn = conn;
        return 0;

out_reject:
        rdma_reject(id, NULL, 0);
        return destroy;
}

-- 
Ding Dinghua
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to