We have to add each submitted polled request to the io_context
poll_submitted list, which means we have to grab the poll_lock. We
already use the block plug to batch submissions if we're doing a batch
of IO submissions, extend that to cover the poll requests internally as
well.

Signed-off-by: Jens Axboe <ax...@kernel.dk>
---
 fs/aio.c | 136 +++++++++++++++++++++++++++++++++++++++++++++----------
 1 file changed, 113 insertions(+), 23 deletions(-)

diff --git a/fs/aio.c b/fs/aio.c
index c4dbd5e1c350..2e8cde976cb4 100644
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -240,6 +240,21 @@ struct aio_kiocb {
        };
 };
 
+struct aio_submit_state {
+       struct kioctx *ctx;
+
+       struct blk_plug plug;
+#ifdef CONFIG_BLOCK
+       struct blk_plug_cb plug_cb;
+#endif
+
+       /*
+        * Polled iocbs that have been submitted, but not added to the ctx yet
+        */
+       struct list_head req_list;
+       unsigned int req_count;
+};
+
 /*------ sysctl variables----*/
 static DEFINE_SPINLOCK(aio_nr_lock);
 unsigned long aio_nr;          /* current system wide number of aio requests */
@@ -257,6 +272,15 @@ static const struct address_space_operations aio_ctx_aops;
 static const unsigned int iocb_page_shift =
                                ilog2(PAGE_SIZE / sizeof(struct iocb));
 
+/*
+ * We rely on block level unplugs to flush pending requests, if we schedule
+ */
+#ifdef CONFIG_BLOCK
+static const bool aio_use_state_req_list = true;
+#else
+static const bool aio_use_state_req_list = false;
+#endif
+
 static void aio_useriocb_unmap(struct kioctx *);
 static void aio_iopoll_reap_events(struct kioctx *);
 
@@ -1901,13 +1925,28 @@ static inline void aio_rw_done(struct kiocb *req, 
ssize_t ret)
        }
 }
 
+/*
+ * Called either at the end of IO submission, or through a plug callback
+ * because we're going to schedule. Moves out local batch of requests to
+ * the ctx poll list, so they can be found for polling + reaping.
+ */
+static void aio_flush_state_reqs(struct kioctx *ctx,
+                                struct aio_submit_state *state)
+{
+       spin_lock(&ctx->poll_lock);
+       list_splice_tail_init(&state->req_list, &ctx->poll_submitted);
+       spin_unlock(&ctx->poll_lock);
+       state->req_count = 0;
+}
+
 /*
  * After the iocb has been issued, it's safe to be found on the poll list.
  * Adding the kiocb to the list AFTER submission ensures that we don't
  * find it from a io_getevents() thread before the issuer is done accessing
  * the kiocb cookie.
  */
-static void aio_iopoll_iocb_issued(struct aio_kiocb *kiocb)
+static void aio_iopoll_iocb_issued(struct aio_submit_state *state,
+                                  struct aio_kiocb *kiocb)
 {
        /*
         * For fast devices, IO may have already completed. If it has, add
@@ -1917,12 +1956,21 @@ static void aio_iopoll_iocb_issued(struct aio_kiocb 
*kiocb)
        const int front = test_bit(KIOCB_F_POLL_COMPLETED, &kiocb->ki_flags);
        struct kioctx *ctx = kiocb->ki_ctx;
 
-       spin_lock(&ctx->poll_lock);
-       if (front)
-               list_add(&kiocb->ki_list, &ctx->poll_submitted);
-       else
-               list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
-       spin_unlock(&ctx->poll_lock);
+       if (!state || !aio_use_state_req_list) {
+               spin_lock(&ctx->poll_lock);
+               if (front)
+                       list_add(&kiocb->ki_list, &ctx->poll_submitted);
+               else
+                       list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
+               spin_unlock(&ctx->poll_lock);
+       } else {
+               if (front)
+                       list_add(&kiocb->ki_list, &state->req_list);
+               else
+                       list_add_tail(&kiocb->ki_list, &state->req_list);
+               if (++state->req_count >= AIO_IOPOLL_BATCH)
+                       aio_flush_state_reqs(ctx, state);
+       }
 }
 
 static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
@@ -2218,7 +2266,8 @@ static ssize_t aio_poll(struct aio_kiocb *aiocb, const 
struct iocb *iocb)
 }
 
 static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
-                          struct iocb __user *user_iocb, bool compat)
+                          struct iocb __user *user_iocb,
+                          struct aio_submit_state *state, bool compat)
 {
        struct aio_kiocb *req;
        ssize_t ret;
@@ -2322,7 +2371,7 @@ static int __io_submit_one(struct kioctx *ctx, const 
struct iocb *iocb,
                        ret = -EAGAIN;
                        goto out_put_req;
                }
-               aio_iopoll_iocb_issued(req);
+               aio_iopoll_iocb_issued(state, req);
        }
        return 0;
 out_put_req:
@@ -2336,7 +2385,7 @@ static int __io_submit_one(struct kioctx *ctx, const 
struct iocb *iocb,
 }
 
 static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
-                        bool compat)
+                        struct aio_submit_state *state, bool compat)
 {
        struct iocb iocb, *iocbp;
 
@@ -2353,7 +2402,44 @@ static int io_submit_one(struct kioctx *ctx, struct iocb 
__user *user_iocb,
                iocbp = &iocb;
        }
 
-       return __io_submit_one(ctx, iocbp, user_iocb, compat);
+       return __io_submit_one(ctx, iocbp, user_iocb, state, compat);
+}
+
+#ifdef CONFIG_BLOCK
+static void aio_state_unplug(struct blk_plug_cb *cb, bool from_schedule)
+{
+       struct aio_submit_state *state;
+
+       state = container_of(cb, struct aio_submit_state, plug_cb);
+       if (!list_empty(&state->req_list))
+               aio_flush_state_reqs(state->ctx, state);
+}
+#endif
+
+/*
+ * Batched submission is done, ensure local IO is flushed out.
+ */
+static void aio_submit_state_end(struct aio_submit_state *state)
+{
+       blk_finish_plug(&state->plug);
+       if (!list_empty(&state->req_list))
+               aio_flush_state_reqs(state->ctx, state);
+}
+
+/*
+ * Start submission side cache.
+ */
+static void aio_submit_state_start(struct aio_submit_state *state,
+                                  struct kioctx *ctx)
+{
+       state->ctx = ctx;
+       INIT_LIST_HEAD(&state->req_list);
+       state->req_count = 0;
+#ifdef CONFIG_BLOCK
+       state->plug_cb.callback = aio_state_unplug;
+       blk_start_plug(&state->plug);
+       list_add(&state->plug_cb.list, &state->plug.cb_list);
+#endif
 }
 
 /* sys_io_submit:
@@ -2371,10 +2457,10 @@ static int io_submit_one(struct kioctx *ctx, struct 
iocb __user *user_iocb,
 SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
                struct iocb __user * __user *, iocbpp)
 {
+       struct aio_submit_state state, *statep = NULL;
        struct kioctx *ctx;
        long ret = 0;
        int i = 0;
-       struct blk_plug plug;
 
        if (unlikely(nr < 0))
                return -EINVAL;
@@ -2388,8 +2474,10 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, 
nr,
        if (nr > ctx->nr_events)
                nr = ctx->nr_events;
 
-       if (nr > AIO_PLUG_THRESHOLD)
-               blk_start_plug(&plug);
+       if (nr > AIO_PLUG_THRESHOLD) {
+               aio_submit_state_start(&state, ctx);
+               statep = &state;
+       }
        for (i = 0; i < nr; i++) {
                struct iocb __user *user_iocb;
 
@@ -2398,12 +2486,12 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, 
nr,
                        break;
                }
 
-               ret = io_submit_one(ctx, user_iocb, false);
+               ret = io_submit_one(ctx, user_iocb, statep, false);
                if (ret)
                        break;
        }
-       if (nr > AIO_PLUG_THRESHOLD)
-               blk_finish_plug(&plug);
+       if (statep)
+               aio_submit_state_end(statep);
 
        percpu_ref_put(&ctx->users);
        return i ? i : ret;
@@ -2413,10 +2501,10 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, 
nr,
 COMPAT_SYSCALL_DEFINE3(io_submit, compat_aio_context_t, ctx_id,
                       int, nr, compat_uptr_t __user *, iocbpp)
 {
+       struct aio_submit_state state, *statep = NULL;
        struct kioctx *ctx;
        long ret = 0;
        int i = 0;
-       struct blk_plug plug;
 
        if (unlikely(nr < 0))
                return -EINVAL;
@@ -2430,8 +2518,10 @@ COMPAT_SYSCALL_DEFINE3(io_submit, compat_aio_context_t, 
ctx_id,
        if (nr > ctx->nr_events)
                nr = ctx->nr_events;
 
-       if (nr > AIO_PLUG_THRESHOLD)
-               blk_start_plug(&plug);
+       if (nr > AIO_PLUG_THRESHOLD) {
+               aio_submit_state_start(&state, ctx);
+               statep = &state;
+       }
        for (i = 0; i < nr; i++) {
                compat_uptr_t user_iocb;
 
@@ -2440,12 +2530,12 @@ COMPAT_SYSCALL_DEFINE3(io_submit, compat_aio_context_t, 
ctx_id,
                        break;
                }
 
-               ret = io_submit_one(ctx, compat_ptr(user_iocb), true);
+               ret = io_submit_one(ctx, compat_ptr(user_iocb), statep, true);
                if (ret)
                        break;
        }
-       if (nr > AIO_PLUG_THRESHOLD)
-               blk_finish_plug(&plug);
+       if (statep)
+               aio_submit_state_end(statep);
 
        percpu_ref_put(&ctx->users);
        return i ? i : ret;
-- 
2.17.1

Reply via email to