> On Sep 20, 2015, at 3:52 AM, Sagi Grimberg <[email protected]> wrote:
> 
> On 9/17/2015 11:44 PM, Chuck Lever wrote:
>> The rb_send_bufs and rb_recv_bufs arrays are used to implement a
>> pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep
>> structs. Replace those arrays with free lists.
>> 
>> To allow more than 512 RPCs in-flight at once, each of these arrays
>> would be larger than a page (assuming 8-byte addresses and 4KB
>> pages). Allowing up to 64K in-flight RPCs (as TCP now does), each
>> buffer array would have to be 128 pages. That's an order-6
>> allocation. (Not that we're going there.)
>> 
>> A list is easier to expand dynamically. Instead of allocating a
>> larger array of pointers and copying the existing pointers to the
>> new array, simply append more buffers to each list.
>> 
>> This also makes it simpler to manage receive buffers that might
>> catch backwards-direction calls, or to post receive buffers in
>> bulk to amortize the overhead of ib_post_recv.
>> 
>> Signed-off-by: Chuck Lever <[email protected]>
> 
> Hi Chuck,
> 
> I get the idea of this patch, but it is a bit confusing (to a
> non-educated reader).

OK, let’s see if there’s room for additional improvement.


> Can you explain why sometimes you call put/get_locked routines
> and sometimes you open-code them?

Are you talking about the later patch that adds support for
receiving backwards calls? That probably should use the
existing helpers, shouldn’t it.


> And is it mandatory to have
> the callers lock before calling get/put? Perhaps the code would
> be simpler if the get/put routines would take care of locking
> since rb_lock looks dedicated to them.

Not sure I understand this comment, I thought that the helpers
were already doing the locking.


> 
>> ---
>>  net/sunrpc/xprtrdma/verbs.c     |  141 
>> +++++++++++++++++----------------------
>>  net/sunrpc/xprtrdma/xprt_rdma.h |    9 +-
>>  2 files changed, 66 insertions(+), 84 deletions(-)
>> 
>> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
>> index ac1345b..8d99214 100644
>> --- a/net/sunrpc/xprtrdma/verbs.c
>> +++ b/net/sunrpc/xprtrdma/verbs.c
>> @@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>>  {
>>      struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
>>      struct rpcrdma_ia *ia = &r_xprt->rx_ia;
>> -    struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
>> -    char *p;
>> -    size_t len;
>>      int i, rc;
>> 
>> -    buf->rb_max_requests = cdata->max_requests;
>> +    buf->rb_max_requests = r_xprt->rx_data.max_requests;
>>      spin_lock_init(&buf->rb_lock);
>> 
>> -    /* Need to allocate:
>> -     *   1.  arrays for send and recv pointers
>> -     *   2.  arrays of struct rpcrdma_req to fill in pointers
>> -     *   3.  array of struct rpcrdma_rep for replies
>> -     * Send/recv buffers in req/rep need to be registered
>> -     */
>> -    len = buf->rb_max_requests *
>> -            (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
>> -
>> -    p = kzalloc(len, GFP_KERNEL);
>> -    if (p == NULL) {
>> -            dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
>> -                    __func__, len);
>> -            rc = -ENOMEM;
>> -            goto out;
>> -    }
>> -    buf->rb_pool = p;       /* for freeing it later */
>> -
>> -    buf->rb_send_bufs = (struct rpcrdma_req **) p;
>> -    p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
>> -    buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
>> -    p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
>> -
>>      rc = ia->ri_ops->ro_init(r_xprt);
>>      if (rc)
>>              goto out;
>> 
>> +    INIT_LIST_HEAD(&buf->rb_send_bufs);
>>      for (i = 0; i < buf->rb_max_requests; i++) {
>>              struct rpcrdma_req *req;
>> -            struct rpcrdma_rep *rep;
>> 
>>              req = rpcrdma_create_req(r_xprt);
>>              if (IS_ERR(req)) {
>> @@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>>                      rc = PTR_ERR(req);
>>                      goto out;
>>              }
>> -            buf->rb_send_bufs[i] = req;
>> +            list_add(&req->rl_free, &buf->rb_send_bufs);
>> +    }
>> +
>> +    INIT_LIST_HEAD(&buf->rb_recv_bufs);
>> +    for (i = 0; i < buf->rb_max_requests + 2; i++) {
>> +            struct rpcrdma_rep *rep;
>> 
>>              rep = rpcrdma_create_rep(r_xprt);
>>              if (IS_ERR(rep)) {
>> @@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>>                      rc = PTR_ERR(rep);
>>                      goto out;
>>              }
>> -            buf->rb_recv_bufs[i] = rep;
>> +            list_add(&rep->rr_list, &buf->rb_recv_bufs);
>>      }
>> 
>>      return 0;
>> @@ -1051,25 +1030,26 @@ void
>>  rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
>>  {
>>      struct rpcrdma_ia *ia = rdmab_to_ia(buf);
>> -    int i;
>> 
>> -    /* clean up in reverse order from create
>> -     *   1.  recv mr memory (mr free, then kfree)
>> -     *   2.  send mr memory (mr free, then kfree)
>> -     *   3.  MWs
>> -     */
>> -    dprintk("RPC:       %s: entering\n", __func__);
>> +    while (!list_empty(&buf->rb_recv_bufs)) {
>> +            struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next,
>> +                                                 struct rpcrdma_rep,
>> +                                                 rr_list);
>> 
>> -    for (i = 0; i < buf->rb_max_requests; i++) {
>> -            if (buf->rb_recv_bufs)
>> -                    rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
>> -            if (buf->rb_send_bufs)
>> -                    rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
>> +            list_del(&rep->rr_list);
>> +            rpcrdma_destroy_rep(ia, rep);
>>      }
>> 
>> -    ia->ri_ops->ro_destroy(buf);
>> +    while (!list_empty(&buf->rb_send_bufs)) {
>> +            struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
>> +                                                 struct rpcrdma_req,
>> +                                                 rl_free);
>> 
>> -    kfree(buf->rb_pool);
>> +            list_del(&req->rl_free);
>> +            rpcrdma_destroy_req(ia, req);
>> +    }
>> +
>> +    ia->ri_ops->ro_destroy(buf);
>>  }
>> 
>>  struct rpcrdma_mw *
>> @@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct 
>> rpcrdma_mw *mw)
>>  }
>> 
>>  static void
>> -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer 
>> *buf)
>> +rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer 
>> *buf)
>>  {
>> -    buf->rb_send_bufs[--buf->rb_send_index] = req;
>> -    req->rl_niovs = 0;
>> -    if (req->rl_reply) {
>> -            buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
>> -            req->rl_reply = NULL;
>> -    }
>> +    list_add_tail(&rep->rr_list, &buf->rb_recv_bufs);
>> +}
>> +
>> +static struct rpcrdma_rep *
>> +rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
>> +{
>> +    struct rpcrdma_rep *rep;
>> +
>> +    rep = list_first_entry(&buf->rb_recv_bufs,
>> +                           struct rpcrdma_rep, rr_list);
>> +    list_del(&rep->rr_list);
>> +
>> +    return rep;
>>  }
> 
> There seems to be a distinction between send/recv buffers. Would it
> make sense to have a symmetric handling for both send/recv buffers?

Or maybe the same helpers could handle both. I’ll have a look
when I get back from SNIA SDC.


>>  /*
>>   * Get a set of request/reply buffers.
>>   *
>> - * Reply buffer (if needed) is attached to send buffer upon return.
>> - * Rule:
>> - *    rb_send_index and rb_recv_index MUST always be pointing to the
>> - *    *next* available buffer (non-NULL). They are incremented after
>> - *    removing buffers, and decremented *before* returning them.
>> + * Reply buffer (if available) is attached to send buffer upon return.
>>   */
>>  struct rpcrdma_req *
>>  rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>> @@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
>> 
>>      spin_lock_irqsave(&buffers->rb_lock, flags);
>> 
>> -    if (buffers->rb_send_index == buffers->rb_max_requests) {
>> +    if (list_empty(&buffers->rb_send_bufs)) {
>>              spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> -            dprintk("RPC:       %s: out of request buffers\n", __func__);
>> -            return ((struct rpcrdma_req *)NULL);
>> -    }
>> -
>> -    req = buffers->rb_send_bufs[buffers->rb_send_index];
>> -    if (buffers->rb_send_index < buffers->rb_recv_index) {
>> -            dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
>> -                    __func__,
>> -                    buffers->rb_recv_index - buffers->rb_send_index);
>> -            req->rl_reply = NULL;
>> -    } else {
>> -            req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
>> -            buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
>> +            pr_warn("RPC:       %s: out of request buffers\n", __func__);
>> +            return NULL;
>>      }
>> -    buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
>> +    req = list_first_entry(&buffers->rb_send_bufs,
>> +                           struct rpcrdma_req, rl_free);
>> +    list_del(&req->rl_free);
>> 
>> +    req->rl_reply = NULL;
>> +    if (!list_empty(&buffers->rb_recv_bufs))
>> +            req->rl_reply = rpcrdma_buffer_get_locked(buffers);
> 
> Would it make sense to check !list_empty() inside _get_locked and handle
> a possible NULL return?
> 
>>      spin_unlock_irqrestore(&buffers->rb_lock, flags);
>> +
>> +    if (!req->rl_reply)
>> +            pr_warn("RPC:       %s: out of reply buffers\n", __func__);
>>      return req;
>>  }
>> 
>> @@ -1159,17 +1139,22 @@ void
>>  rpcrdma_buffer_put(struct rpcrdma_req *req)
>>  {
>>      struct rpcrdma_buffer *buffers = req->rl_buffer;
>> +    struct rpcrdma_rep *rep = req->rl_reply;
>>      unsigned long flags;
>> 
>> +    req->rl_niovs = 0;
>> +    req->rl_reply = NULL;
>> +
>>      spin_lock_irqsave(&buffers->rb_lock, flags);
>> -    rpcrdma_buffer_put_sendbuf(req, buffers);
>> +    list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
>> +    if (rep)
>> +            rpcrdma_buffer_put_locked(rep, buffers);
>>      spin_unlock_irqrestore(&buffers->rb_lock, flags);
>>  }
>> 
>>  /*
>>   * Recover reply buffers from pool.
>> - * This happens when recovering from error conditions.
>> - * Post-increment counter/array index.
>> + * This happens when recovering from disconnect.
>>   */
>>  void
>>  rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>> @@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
>>      unsigned long flags;
>> 
>>      spin_lock_irqsave(&buffers->rb_lock, flags);
>> -    if (buffers->rb_recv_index < buffers->rb_max_requests) {
>> -            req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
>> -            buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
>> -    }
>> +    if (!list_empty(&buffers->rb_recv_bufs))
>> +            req->rl_reply = rpcrdma_buffer_get_locked(buffers);
>>      spin_unlock_irqrestore(&buffers->rb_lock, flags);
>>  }
>> 
>> @@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
>>      unsigned long flags;
>> 
>>      spin_lock_irqsave(&buffers->rb_lock, flags);
>> -    buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
>> +    rpcrdma_buffer_put_locked(rep, buffers);
>>      spin_unlock_irqrestore(&buffers->rb_lock, flags);
>>  }
>> 
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h 
>> b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index a13508b..e6a358f 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg {          /* chunk descriptors */
>>  #define RPCRDMA_MAX_IOVS    (2)
>> 
>>  struct rpcrdma_req {
>> +    struct list_head        rl_free;
>>      unsigned int            rl_niovs;
>>      unsigned int            rl_nchunks;
>>      unsigned int            rl_connect_cookie;
>> @@ -285,12 +286,10 @@ struct rpcrdma_buffer {
>>      struct list_head        rb_all;
>>      char                    *rb_pool;
>> 
>> -    spinlock_t              rb_lock;        /* protect buf arrays */
>> +    spinlock_t              rb_lock;        /* protect buf lists */
>> +    struct list_head        rb_send_bufs;
>> +    struct list_head        rb_recv_bufs;
>>      u32                     rb_max_requests;
>> -    int                     rb_send_index;
>> -    int                     rb_recv_index;
>> -    struct rpcrdma_req      **rb_send_bufs;
>> -    struct rpcrdma_rep      **rb_recv_bufs;
>>  };
>>  #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, 
>> rx_buf)->rx_ia)
>> 
>> 
>> --
>> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
>> the body of a message to [email protected]
>> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>> 
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to [email protected]
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

—
Chuck Lever



--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to