On 3/13/2015 11:28 PM, Chuck Lever wrote:
Acquiring 64 FMRs in rpcrdma_buffer_get() while holding the buffer
pool lock is expensive, and unnecessary because FMR mode can
transfer up to a 1MB payload using just a single ib_fmr.

Instead, acquire ib_fmrs one-at-a-time as chunks are registered, and
return them to rb_mws immediately during deregistration.

Signed-off-by: Chuck Lever <[email protected]>
---
  net/sunrpc/xprtrdma/fmr_ops.c   |   41 ++++++++++++++++++++++++++++++++++++---
  net/sunrpc/xprtrdma/verbs.c     |   41 ++++++++++++++-------------------------
  net/sunrpc/xprtrdma/xprt_rdma.h |    1 +
  3 files changed, 54 insertions(+), 29 deletions(-)

diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 96e6cd3..9c6c2e8 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -28,10 +28,11 @@ __fmr_unmap(struct rpcrdma_xprt *r_xprt, struct 
rpcrdma_mr_seg *seg)
  {
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_mr_seg *seg1 = seg;
+       struct rpcrdma_mw *mw = seg1->rl_mw;
        int rc, nsegs = seg->mr_nsegs;
        LIST_HEAD(l);

-       list_add(&seg1->rl_mw->r.fmr->list, &l);
+       list_add(&mw->r.fmr->list, &l);
        rc = ib_unmap_fmr(&l);
        read_lock(&ia->ri_qplock);
        while (seg1->mr_nsegs--)
@@ -39,11 +40,14 @@ __fmr_unmap(struct rpcrdma_xprt *r_xprt, struct 
rpcrdma_mr_seg *seg)
        read_unlock(&ia->ri_qplock);
        if (rc)
                goto out_err;
+out:
+       seg1->rl_mw = NULL;
+       rpcrdma_put_mw(r_xprt, mw);
        return nsegs;

  out_err:
        dprintk("RPC:       %s: ib_unmap_fmr status %i\n", __func__, rc);
-       return nsegs;
+       goto out;
  }

  static int
@@ -117,6 +121,27 @@ out_fmr_err:
        return rc;
  }

+static struct rpcrdma_mw *
+__fmr_get_mw(struct rpcrdma_xprt *r_xprt)

This introduces an asymmetric approach where you have fmr/frwr get_mw
routines but have a single rpcrdma_put_mw. I noticed that the
frwr_get_mw (next patch) is almost completely different - but I wander
if that should really be that different?

Just raising the question.

+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_mw *mw = NULL;
+       unsigned long flags;
+
+       spin_lock_irqsave(&buf->rb_lock, flags);
+
+       if (!list_empty(&buf->rb_mws)) {
+               mw = list_entry(buf->rb_mws.next,
+                               struct rpcrdma_mw, mw_list);
+               list_del_init(&mw->mw_list);
+       } else {
+               pr_err("RPC:       %s: no MWs available\n", __func__);
+       }
+
+       spin_unlock_irqrestore(&buf->rb_lock, flags);
+       return mw;
+}
+
  /* Use the ib_map_phys_fmr() verb to register a memory region
   * for remote access via RDMA READ or RDMA WRITE.
   */
@@ -126,10 +151,18 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct 
rpcrdma_mr_seg *seg,
  {
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_mr_seg *seg1 = seg;
-       struct rpcrdma_mw *mw = seg1->rl_mw;
+       struct rpcrdma_mw *mw;
        u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
        int len, pageoff, i, rc;

+       mw = __fmr_get_mw(r_xprt);
+       if (!mw)
+               return -ENOMEM;
+       if (seg1->rl_mw) {
+               rpcrdma_put_mw(r_xprt, seg1->rl_mw);
+               seg1->rl_mw = NULL;
+       }
+

How can this happen? getting to op_map with existing rl_mw? and
wouldn't it be better to use rl_mw instead of getting a new mw and
putting seg1->rl_mw?

        pageoff = offset_in_page(seg1->mr_offset);
        seg1->mr_offset -= pageoff;  /* start of page */
        seg1->mr_len += pageoff;
@@ -152,6 +185,7 @@ fmr_op_map(struct rpcrdma_xprt *r_xprt, struct 
rpcrdma_mr_seg *seg,
        if (rc)
                goto out_maperr;

+       seg1->rl_mw = mw;
        seg1->mr_rkey = mw->r.fmr->rkey;
        seg1->mr_base = seg1->mr_dma + pageoff;
        seg1->mr_nsegs = i;
@@ -164,6 +198,7 @@ out_maperr:
                pageoff, i, rc);
        while (i--)
                rpcrdma_unmap_one(ia, --seg);
+       rpcrdma_put_mw(r_xprt, mw);
        return rc;
  }

diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index f108a57..f2316d8 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1171,6 +1171,21 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
        kfree(buf->rb_pool);
  }

+void
+rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       unsigned long flags;
+
+       if (!list_empty(&mw->mw_list))
+               pr_warn("RPC:       %s: mw %p still on a list!\n",
+                       __func__, mw);
+
+       spin_lock_irqsave(&buf->rb_lock, flags);
+       list_add_tail(&mw->mw_list, &buf->rb_mws);
+       spin_unlock_irqrestore(&buf->rb_lock, flags);
+}
+
  /* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
   * some req segments uninitialized.
   */
@@ -1292,28 +1307,6 @@ rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct 
rpcrdma_buffer *buf,
        return NULL;
  }

-static struct rpcrdma_req *
-rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
-       struct rpcrdma_mw *r;
-       int i;
-
-       i = RPCRDMA_MAX_SEGS - 1;
-       while (!list_empty(&buf->rb_mws)) {
-               r = list_entry(buf->rb_mws.next,
-                              struct rpcrdma_mw, mw_list);
-               list_del(&r->mw_list);
-               req->rl_segments[i].rl_mw = r;
-               if (unlikely(i-- == 0))
-                       return req;     /* Success */
-       }
-
-       /* Not enough entries on rb_mws for this req */
-       rpcrdma_buffer_put_sendbuf(req, buf);
-       rpcrdma_buffer_put_mrs(req, buf);
-       return NULL;
-}
-
  /*
   * Get a set of request/reply buffers.
   *
@@ -1355,9 +1348,6 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
        case RPCRDMA_FRMR:
                req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
                break;
-       case RPCRDMA_MTHCAFMR:
-               req = rpcrdma_buffer_get_fmrs(req, buffers);
-               break;
        default:
                break;
        }
@@ -1382,7 +1372,6 @@ rpcrdma_buffer_put(struct rpcrdma_req *req)
        rpcrdma_buffer_put_sendbuf(req, buffers);
        switch (ia->ri_memreg_strategy) {
        case RPCRDMA_FRMR:
-       case RPCRDMA_MTHCAFMR:
                rpcrdma_buffer_put_mrs(req, buffers);
                break;
        default:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 40a0ee8..d5aa5b4 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -426,6 +426,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
  unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
  void rpcrdma_map_one(struct rpcrdma_ia *, struct rpcrdma_mr_seg *, bool);
  void rpcrdma_unmap_one(struct rpcrdma_ia *, struct rpcrdma_mr_seg *);
+void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);

  /*
   * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c

--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html


--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to