On Wed, 2018-11-21 at 06:26 -0700, Jens Axboe wrote:
> On 11/21/18 4:12 AM, Benny Halevy wrote:
> > > +#define AIO_POLL_STACK   8
> > > +
> > > +/*
> > > + * Process completed iocb iopoll entries, copying the result to 
> > > userspace.
> > > + */
> > > +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user 
> > > *evs,
> > > +                     unsigned int *nr_events, long max)
> > > +{
> > > + void *iocbs[AIO_POLL_STACK];
> > > + struct aio_kiocb *iocb, *n;
> > > + int to_free = 0, ret = 0;
> > 
> > To be on the safe side, how about checking that if (evs)
> > *nr_events < max, otherwise, return -EINVAL?
> 
> Good point, I think we should re-arrange the loop a bit to move the
> check up at the top to guard for entries == max at entry. I've done
> that.
> 
> > > + /*
> > > +  * Take in a new working set from the submitted list if possible.
> > > +  */
> > > + if (!list_empty_careful(&ctx->poll_submitted)) {
> > > +         spin_lock(&ctx->poll_lock);
> > > +         list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> > > +         spin_unlock(&ctx->poll_lock);
> > > + }
> > > +
> > > + if (list_empty(&ctx->poll_completing))
> > > +         return 0;
> > 
> > Could be somewhat optimized like this:
> > 
> >     if (list_empty_careful(&ctx->poll_submitted))
> >             return 0;
> > 
> >     spin_lock(&ctx->poll_lock);
> >     list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> >     spin_unlock(&ctx->poll_lock);
> >     if (list_empty(&ctx->poll_completing))
> >             return 0;
> > 
> > Or, possibly...
> >     if (list_empty_careful(&ctx->poll_submitted) ||
> >         ({
> >             spin_lock(&ctx->poll_lock);
> >             list_splice_init(&ctx->poll_submitted, &ctx->poll_completing);
> >             spin_unlock(&ctx->poll_lock);
> >             list_empty(&ctx->poll_completing);
> >         }))
> >             return 0;
> 
> I think the readability of the existing version is better.
> 
> > > + /*
> > > +  * Check again now that we have a new batch.
> > > +  */
> > > + ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > + if (ret < 0)
> > > +         return ret;
> > > + if (*nr_events >= min)
> > > +         return 0;
> > > +
> > > + /*
> > > +  * Find up to 'max_nr' worth of events to poll for, including the
> > 
> > What's max_nr? You mean 'max'?
> 
> It should, corrected.
> 
> > > +  * events we already successfully polled
> > > +  */
> > > + polled = to_poll = 0;
> > > + poll_completed = atomic_read(&ctx->poll_completed);
> > > + list_for_each_entry(iocb, &ctx->poll_completing, ki_list) {
> > > +         /*
> > > +          * Poll for needed events with wait == true, anything after
> > > +          * that we just check if we have more, up to max.
> > > +          */
> > > +         bool wait = polled + *nr_events >= min;
> > > +         struct kiocb *kiocb = &iocb->rw;
> > > +
> > > +         if (test_bit(IOCB_POLL_COMPLETED, &iocb->ki_flags))
> > > +                 break;
> > > +         if (++to_poll + *nr_events >= max)
> > > +                 break;
> > > +
> > > +         polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);
> > 
> > Could iopoll return a negative value? (Currently not in this patchset,
> > but would it be possible in the future?)
> 
> That's a good point, I've added a separate check for this. Given that
> it's a regular fops handler, it should be perfectly valid to return
> -ERROR.
> 
> > > +         if (polled + *nr_events >= max)
> > > +                 break;
> > > +         if (poll_completed != atomic_read(&ctx->poll_completed))
> > > +                 break;
> > > + }
> > > +
> > > + ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > + if (ret < 0)
> > > +         return ret;
> > > + if (*nr_events >= min)
> > > +         return 0;
> > > + return to_poll;
> > 
> > What does the returned value mean?
> > If the intention is only to return a value greater than zero,
> > how about just returning to_poll > 0?
> 
> It just means that you could call us again, if > 0, and < 0 is an error
> specifically.
> 
> > > +/*
> > > + * We can't just wait for polled events to come to us, we have to 
> > > actively
> > > + * find and complete them.
> > > + */
> > > +static void aio_iopoll_reap_events(struct kioctx *ctx)
> > > +{
> > > + if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> > > +         return;
> > > +
> > > + while (!list_empty_careful(&ctx->poll_submitted) ||
> > > +        !list_empty(&ctx->poll_completing)) {
> > > +         unsigned int nr_events = 0;
> > > +
> > > +         __aio_iopoll_check(ctx, NULL, &nr_events, 1, UINT_MAX);
> > 
> > BUG_ON(__aoi_iopoll_check() < 0) ?
> 
> Ho hum...
> 
> > > + }
> > > +}
> > > +
> > > +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> > > +                     struct io_event __user *event)
> > > +{
> > > + unsigned int nr_events = 0;
> > > + int ret = 0;
> > > +
> > > + /* * Only allow one thread polling at a time */
> > 
> > nit: extra '* '
> 
> Removed.
> 
> Thanks for your review!

You're welcome. Always happy to help!
Thanks for introducing this interface!
We intend to make use of it in scylla, via seastar
(See http://seastar.io/ https://github.com/scylladb/seastar)

Benny


Reply via email to