Completions may be done in batches, where io_commit_cqring() is called
only once at the end. It means, that timeout sequence checks are done
only once and don't consider events in between, so potentially failing
to trigger some timeouts.

Do a separate CQ sequence accounting in u64. On timeout sequence
checking look up to UINT_MAX sequence behind, which it could have
missed. It's safe to do, because sqe->off is u32 and so can't wrap
around to used [seq - UINT_MAX, seq] window.

It's also necessary to decouple CQ timeout sequences from
ctx->cq_cached_tail for implementing "single CQE per link" feature and
others.

Signed-off-by: Pavel Begunkov <[email protected]>
---
 fs/io_uring.c | 33 ++++++++++++++++++++++++++-------
 1 file changed, 26 insertions(+), 7 deletions(-)

diff --git a/fs/io_uring.c b/fs/io_uring.c
index fb8ec4b00375..f09c1d3a7e63 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -298,6 +298,7 @@ struct io_ring_ctx {
                unsigned                cq_entries;
                unsigned                cq_mask;
                atomic_t                cq_timeouts;
+               u64                     cq_seq;
                unsigned long           cq_check_overflow;
                struct wait_queue_head  cq_wait;
                struct fasync_struct    *cq_fasync;
@@ -385,7 +386,7 @@ struct io_timeout {
        u64                             addr;
        int                             flags;
        u32                             off;
-       u32                             target_seq;
+       u64                             target_seq;
 };
 
 struct io_rw {
@@ -1081,6 +1082,7 @@ static void io_kill_timeout(struct io_kiocb *req)
        ret = hrtimer_try_to_cancel(&req->io->timeout.timer);
        if (ret != -1) {
                atomic_inc(&req->ctx->cq_timeouts);
+               req->ctx->cq_seq--;
                list_del_init(&req->list);
                req->flags |= REQ_F_COMP_LOCKED;
                io_cqring_fill_event(req, 0);
@@ -1098,16 +1100,31 @@ static void io_kill_timeouts(struct io_ring_ctx *ctx)
        spin_unlock_irq(&ctx->completion_lock);
 }
 
+static inline bool io_check_in_range(u64 pos, u64 start, u64 end)
+{
+       /* if @end < @start, check for [end, MAX_U64] + [MAX_U64, start] */
+       return (pos - start) <= (end - start);
+}
+
 static void __io_flush_timeouts(struct io_ring_ctx *ctx)
 {
+       u64 start_seq = ctx->cq_seq;
+
+
+       /*
+        * Batched CQ commit may have left some pending timeouts sequences
+        * behind @cq_sqe. Look back to find them. Note, that sqe->off is u32,
+        * and it uses u64 to not falsely trigger timeouts with large off.
+        */
+       start_seq -= UINT_MAX;
        do {
                struct io_kiocb *req = list_first_entry(&ctx->timeout_list,
                                                        struct io_kiocb, list);
 
                if (req->flags & REQ_F_TIMEOUT_NOSEQ)
                        break;
-               if (req->timeout.target_seq != ctx->cached_cq_tail
-                                       - atomic_read(&ctx->cq_timeouts))
+               if (!io_check_in_range(req->timeout.target_seq, start_seq,
+                                       ctx->cq_seq))
                        break;
 
                list_del_init(&req->list);
@@ -1143,6 +1160,7 @@ static struct io_uring_cqe *io_get_cqring(struct 
io_ring_ctx *ctx)
                return NULL;
 
        ctx->cached_cq_tail++;
+       ctx->cq_seq++;
        return &rings->cqes[tail & ctx->cq_mask];
 }
 
@@ -4537,6 +4555,8 @@ static enum hrtimer_restart io_timeout_fn(struct hrtimer 
*timer)
        atomic_inc(&ctx->cq_timeouts);
 
        spin_lock_irqsave(&ctx->completion_lock, flags);
+       ctx->cq_seq--;
+
        /*
         * We could be racing with timeout deletion. If the list is empty,
         * then timeout lookup already found it and will be handling it.
@@ -4660,7 +4680,7 @@ static int io_timeout(struct io_kiocb *req)
        struct io_ring_ctx *ctx = req->ctx;
        struct io_timeout_data *data = &req->io->timeout;
        struct list_head *entry;
-       u32 tail, off = req->timeout.off;
+       u32 off = req->timeout.off;
 
        spin_lock_irq(&ctx->completion_lock);
 
@@ -4675,8 +4695,7 @@ static int io_timeout(struct io_kiocb *req)
                goto add;
        }
 
-       tail = ctx->cached_cq_tail - atomic_read(&ctx->cq_timeouts);
-       req->timeout.target_seq = tail + off;
+       req->timeout.target_seq = ctx->cq_seq + off;
 
        /*
         * Insertion sort, ensuring the first entry in the list is always
@@ -4684,7 +4703,7 @@ static int io_timeout(struct io_kiocb *req)
         */
        list_for_each_prev(entry, &ctx->timeout_list) {
                struct io_kiocb *nxt = list_entry(entry, struct io_kiocb, list);
-               u32 nxt_off = nxt->timeout.target_seq - tail;
+               u32 nxt_off = (u32)(nxt->timeout.target_seq - ctx->cq_seq);
 
                if (!(nxt->flags & REQ_F_TIMEOUT_NOSEQ) && (off >= nxt_off))
                        break;
-- 
2.24.0

Reply via email to