On 24/09/2019 11:36, Jens Axboe wrote:
> On 9/24/19 2:27 AM, Jens Axboe wrote:
>> On 9/24/19 2:02 AM, Jens Axboe wrote:
>>> On 9/24/19 1:06 AM, Pavel Begunkov wrote:
>>>> On 24/09/2019 02:00, Jens Axboe wrote:
>>>>>> I think we can do the same thing, just wrapping the waitqueue in a
>>>>>> structure with a count in it, on the stack. Got some flight time
>>>>>> coming up later today, let me try and cook up a patch.
>>>>>
>>>>> Totally untested, and sent out 5 min before departure... But something
>>>>> like this.
>>>> Hmm, reminds me my first version. Basically that's the same thing but
>>>> with macroses inlined. I wanted to make it reusable and self-contained,
>>>> though.
>>>>
>>>> If you don't think it could be useful in other places, sure, we could do
>>>> something like that. Is that so?
>>>
>>> I totally agree it could be useful in other places. Maybe formalized and
>>> used with wake_up_nr() instead of adding a new primitive? Haven't looked
>>> into that, I may be talking nonsense.
>>>
>>> In any case, I did get a chance to test it and it works for me. Here's
>>> the "finished" version, slightly cleaned up and with a comment added
>>> for good measure.
>>
>> Notes:
>>
>> This version gets the ordering right, you need exclusive waits to get
>> fifo ordering on the waitqueue.
>>
>> Both versions (yours and mine) suffer from the problem of potentially
>> waking too many. I don't think this is a real issue, as generally we
>> don't do threaded access to the io_urings. But if you had the following
>> tasks wait on the cqring:
>>
>> [min_events = 32], [min_events = 8], [min_events = 8]
>>
>> and we reach the io_cqring_events() == threshold, we'll wake all three.
>> I don't see a good solution to this, so I suspect we just live with
>> until proven an issue. Both versions are much better than what we have
>> now.
> 
> Forgot an issue around signal handling, version below adds the
> right check for that too.

It seems to be a good reason to not keep reimplementing
"prepare_to_wait*() + wait loop" every time, but keep it in sched :)

> 
> Curious what your test case was for this?
You mean a performance test case? It's briefly described in a comment
for the second patch. That's just rewritten io_uring-bench, with
1. a thread generating 1 request per call in a loop
2. and the second thread waiting for ~128 events.
Both are pinned to the same core.

> 
> 
> diff --git a/fs/io_uring.c b/fs/io_uring.c
> index ca7570aca430..3fbab5692f14 100644
> --- a/fs/io_uring.c
> +++ b/fs/io_uring.c
> @@ -2768,6 +2768,42 @@ static int io_ring_submit(struct io_ring_ctx *ctx, 
> unsigned int to_submit,
>       return submit;
>  }
>  
> +struct io_wait_queue {
> +     struct wait_queue_entry wq;
> +     struct io_ring_ctx *ctx;
> +     struct task_struct *task;
> +     unsigned to_wait;
> +     unsigned nr_timeouts;
> +};
> +
> +static inline bool io_should_wake(struct io_wait_queue *iowq)
> +{
> +     struct io_ring_ctx *ctx = iowq->ctx;
> +
> +     /*
> +      * Wake up if we have enough events, or if a timeout occured since we
> +      * started waiting. For timeouts, we always want to return to userspace,
> +      * regardless of event count.
> +      */
> +     return io_cqring_events(ctx->rings) >= iowq->to_wait ||
> +                     atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
> +}
> +
> +static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
> +                         int wake_flags, void *key)
> +{
> +     struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
> +                                                     wq);
> +
> +     if (io_should_wake(iowq)) {
> +             list_del_init(&curr->entry);
> +             wake_up_process(iowq->task);
> +             return 1;
> +     }
> +
> +     return -1;
> +}
> +
>  /*
>   * Wait until events become available, if we don't already have some. The
>   * application must reap them itself, as they reside on the shared cq ring.
> @@ -2775,8 +2811,16 @@ static int io_ring_submit(struct io_ring_ctx *ctx, 
> unsigned int to_submit,
>  static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
>                         const sigset_t __user *sig, size_t sigsz)
>  {
> +     struct io_wait_queue iowq = {
> +             .wq = {
> +                     .func   = io_wake_function,
> +                     .entry  = LIST_HEAD_INIT(iowq.wq.entry),
> +             },
> +             .task           = current,
> +             .ctx            = ctx,
> +             .to_wait        = min_events,
> +     };
>       struct io_rings *rings = ctx->rings;
> -     unsigned nr_timeouts;
>       int ret;
>  
>       if (io_cqring_events(rings) >= min_events)
> @@ -2795,15 +2839,18 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, 
> int min_events,
>                       return ret;
>       }
>  
> -     nr_timeouts = atomic_read(&ctx->cq_timeouts);
> -     /*
> -      * Return if we have enough events, or if a timeout occured since
> -      * we started waiting. For timeouts, we always want to return to
> -      * userspace.
> -      */
> -     ret = wait_event_interruptible(ctx->wait,
> -                             io_cqring_events(rings) >= min_events ||
> -                             atomic_read(&ctx->cq_timeouts) != nr_timeouts);
> +     iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
> +     prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE);
> +     do {
> +             if (io_should_wake(&iowq))
> +                     break;
> +             schedule();
> +             if (signal_pending(current))
> +                     break;
> +             set_current_state(TASK_INTERRUPTIBLE);
> +     } while (1);
> +     finish_wait(&ctx->wait, &iowq.wq);
> +
>       restore_saved_sigmask_unless(ret == -ERESTARTSYS);
>       if (ret == -ERESTARTSYS)
>               ret = -EINTR;
> 

-- 
Yours sincerely,
Pavel Begunkov

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to