On Wed, 2018-11-21 at 06:26 -0700, Jens Axboe wrote:
> On 11/21/18 4:12 AM, Benny Halevy wrote:
> > > +#define AIO_POLL_STACK 8
> > > +
> > > +/*
> > > + * Process completed iocb iopoll entries, copying the result to
> > > userspace.
> > > + */
> > > +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user
> > > *evs,
> > > + unsigned int *nr_events, long max)
> > > +{
> > > + void *iocbs[AIO_POLL_STACK];
> > > + struct aio_kiocb *iocb, *n;
> > > + int to_free = 0, ret = 0;
> >
> > To be on the safe side, how about checking that if (evs)
> > *nr_events < max, otherwise, return -EINVAL?
>
> Good point, I think we should re-arrange the loop a bit to move the
> check up at the top to guard for entries == max at entry. I've done
> that.
>
> > > + /*
> > > + * Take in a new working set from the submitted list if possible.
> > > + */
> > > + if (!list_empty_careful(&ctx->poll_submitted)) {
> > > + spin_lock(&ctx->poll_lock);
> > > + list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> > > + spin_unlock(&ctx->poll_lock);
> > > + }
> > > +
> > > + if (list_empty(&ctx->poll_completing))
> > > + return 0;
> >
> > Could be somewhat optimized like this:
> >
> > if (list_empty_careful(&ctx->poll_submitted))
> > return 0;
> >
> > spin_lock(&ctx->poll_lock);
> > list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> > spin_unlock(&ctx->poll_lock);
> > if (list_empty(&ctx->poll_completing))
> > return 0;
> >
> > Or, possibly...
> > if (list_empty_careful(&ctx->poll_submitted) ||
> > ({
> > spin_lock(&ctx->poll_lock);
> > list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> > spin_unlock(&ctx->poll_lock);
> > list_empty(&ctx->poll_completing);
> > }))
> > return 0;
>
> I think the readability of the existing version is better.
>
> > > + /*
> > > + * Check again now that we have a new batch.
> > > + */
> > > + ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > + if (ret < 0)
> > > + return ret;
> > > + if (*nr_events >= min)
> > > + return 0;
> > > +
> > > + /*
> > > + * Find up to 'max_nr' worth of events to poll for, including the
> >
> > What's max_nr? You mean 'max'?
>
> It should, corrected.
>
> > > + * events we already successfully polled
> > > + */
> > > + polled = to_poll = 0;
> > > + poll_completed = atomic_read(&ctx->poll_completed);
> > > + list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
> > > + /*
> > > + * Poll for needed events with wait == true, anything after
> > > + * that we just check if we have more, up to max.
> > > + */
> > > + bool wait = polled + *nr_events >= min;
> > > + struct kiocb *kiocb = &iocb->rw;
> > > +
> > > + if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
> > > + break;
> > > + if (++to_poll + *nr_events >= max)
> > > + break;
> > > +
> > > + polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);
> >
> > Could iopoll return a negative value? (Currently not in this patchset,
> > but would it be possible in the future?)
>
> That's a good point, I've added a separate check for this. Given that
> it's a regular fops handler, it should be perfectly valid to return
> -ERROR.
>
> > > + if (polled + *nr_events >= max)
> > > + break;
> > > + if (poll_completed != atomic_read(&ctx->poll_completed))
> > > + break;
> > > + }
> > > +
> > > + ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > + if (ret < 0)
> > > + return ret;
> > > + if (*nr_events >= min)
> > > + return 0;
> > > + return to_poll;
> >
> > What does the returned value mean?
> > If the intention is only to return a value greater than zero,
> > how about just returning to_poll > 0?
>
> It just means that you could call us again, if > 0, and < 0 is an error
> specifically.
>
> > > +/*
> > > + * We can't just wait for polled events to come to us, we have to
> > > actively
> > > + * find and complete them.
> > > + */
> > > +static void aio_iopoll_reap_events(struct kioctx *ctx)
> > > +{
> > > + if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> > > + return;
> > > +
> > > + while (!list_empty_careful(&ctx->poll_submitted) ||
> > > + !list_empty(&ctx->poll_completing)) {
> > > + unsigned int nr_events = 0;
> > > +
> > > + __aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);
> >
> > BUG_ON(__aoi_iopoll_check() < 0) ?
>
> Ho hum...
>
> > > + }
> > > +}
> > > +
> > > +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> > > + struct io_event __user *event)
> > > +{
> > > + unsigned int nr_events = 0;
> > > + int ret = 0;
> > > +
> > > + /* * Only allow one thread polling at a time */
> >
> > nit: extra '* '
>
> Removed.
>
> Thanks for your review!
You're welcome. Always happy to help!
Thanks for introducing this interface!
We intend to make use of it in scylla, via seastar
(See http://seastar.io/ https://github.com/scylladb/seastar)
Benny