Re: [PATCH 8/8] aio: support for IO polling

2018-11-22 Thread Jens Axboe
On 11/22/18 4:13 AM, Jan Kara wrote:
> 
> On Tue 20-11-18 10:19:53, Jens Axboe wrote:
>> +/*
>> + * We can't just wait for polled events to come to us, we have to actively
>> + * find and complete them.
>> + */
>> +static void aio_iopoll_reap_events(struct kioctx *ctx)
>> +{
>> +if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
>> +return;
>> +
>> +while (!list_empty_careful(>poll_submitted) ||
>> +   !list_empty(>poll_completing)) {
>> +unsigned int nr_events = 0;
>> +
>> +__aio_iopoll_check(ctx, NULL, _events, 1, UINT_MAX);
>> +}
>> +}
>> +
>> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
>> +struct io_event __user *event)
>> +{
>> +unsigned int nr_events = 0;
>> +int ret = 0;
>> +
>> +/* * Only allow one thread polling at a time */
>> +if (test_and_set_bit(0, >getevents_busy))
>> +return -EBUSY;
>> +
>> +while (!nr_events || !need_resched()) {
>> +int tmin = 0;
>> +
>> +if (nr_events < min_nr)
>> +tmin = min_nr - nr_events;
>> +
>> +ret = __aio_iopoll_check(ctx, event, _events, tmin, nr);
>> +if (ret <= 0)
>> +break;
>> +ret = 0;
>> +}
>> +
>> +clear_bit(0, >getevents_busy);
>> +return nr_events ? nr_events : ret;
>> +}
> 
> Hum, what if userspace calls io_destroy() while another process is polling
> for events on the same kioctx? It seems we'd be reaping events from two
> processes in parallel in that case which will result in various
> "interesting" effects like ctx->poll_completing list corruption...

I've replaced the ->getevents_busy with a mutex, and we also protect
the ->dead check inside that mutex. That ensures that destroy can't
proceed before a potential caller is inside getevents(), and that
getevents() sees if the ctx is being destroyed.

-- 
Jens Axboe



Re: [PATCH 8/8] aio: support for IO polling

2018-11-22 Thread Jan Kara


On Tue 20-11-18 10:19:53, Jens Axboe wrote:
> +/*
> + * We can't just wait for polled events to come to us, we have to actively
> + * find and complete them.
> + */
> +static void aio_iopoll_reap_events(struct kioctx *ctx)
> +{
> + if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> + return;
> +
> + while (!list_empty_careful(>poll_submitted) ||
> +!list_empty(>poll_completing)) {
> + unsigned int nr_events = 0;
> +
> + __aio_iopoll_check(ctx, NULL, _events, 1, UINT_MAX);
> + }
> +}
> +
> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> + struct io_event __user *event)
> +{
> + unsigned int nr_events = 0;
> + int ret = 0;
> +
> + /* * Only allow one thread polling at a time */
> + if (test_and_set_bit(0, >getevents_busy))
> + return -EBUSY;
> +
> + while (!nr_events || !need_resched()) {
> + int tmin = 0;
> +
> + if (nr_events < min_nr)
> + tmin = min_nr - nr_events;
> +
> + ret = __aio_iopoll_check(ctx, event, _events, tmin, nr);
> + if (ret <= 0)
> + break;
> + ret = 0;
> + }
> +
> + clear_bit(0, >getevents_busy);
> + return nr_events ? nr_events : ret;
> +}

Hum, what if userspace calls io_destroy() while another process is polling
for events on the same kioctx? It seems we'd be reaping events from two
processes in parallel in that case which will result in various
"interesting" effects like ctx->poll_completing list corruption...

Honza
-- 
Jan Kara 
SUSE Labs, CR


Re: [PATCH 8/8] aio: support for IO polling

2018-11-21 Thread Benny Halevy
On Wed, 2018-11-21 at 06:26 -0700, Jens Axboe wrote:
> On 11/21/18 4:12 AM, Benny Halevy wrote:
> > > +#define AIO_POLL_STACK   8
> > > +
> > > +/*
> > > + * Process completed iocb iopoll entries, copying the result to 
> > > userspace.
> > > + */
> > > +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user 
> > > *evs,
> > > + unsigned int *nr_events, long max)
> > > +{
> > > + void *iocbs[AIO_POLL_STACK];
> > > + struct aio_kiocb *iocb, *n;
> > > + int to_free = 0, ret = 0;
> > 
> > To be on the safe side, how about checking that if (evs)
> > *nr_events < max, otherwise, return -EINVAL?
> 
> Good point, I think we should re-arrange the loop a bit to move the
> check up at the top to guard for entries == max at entry. I've done
> that.
> 
> > > + /*
> > > +  * Take in a new working set from the submitted list if possible.
> > > +  */
> > > + if (!list_empty_careful(>poll_submitted)) {
> > > + spin_lock(>poll_lock);
> > > + list_splice_init(>poll_submitted, >poll_completing);
> > > + spin_unlock(>poll_lock);
> > > + }
> > > +
> > > + if (list_empty(>poll_completing))
> > > + return 0;
> > 
> > Could be somewhat optimized like this:
> > 
> > if (list_empty_careful(>poll_submitted))
> > return 0;
> > 
> > spin_lock(>poll_lock);
> > list_splice_init(>poll_submitted, >poll_completing);
> > spin_unlock(>poll_lock);
> > if (list_empty(>poll_completing))
> > return 0;
> > 
> > Or, possibly...
> > if (list_empty_careful(>poll_submitted) ||
> > ({
> > spin_lock(>poll_lock);
> > list_splice_init(>poll_submitted, >poll_completing);
> > spin_unlock(>poll_lock);
> > list_empty(>poll_completing);
> > }))
> > return 0;
> 
> I think the readability of the existing version is better.
> 
> > > + /*
> > > +  * Check again now that we have a new batch.
> > > +  */
> > > + ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > + if (ret < 0)
> > > + return ret;
> > > + if (*nr_events >= min)
> > > + return 0;
> > > +
> > > + /*
> > > +  * Find up to 'max_nr' worth of events to poll for, including the
> > 
> > What's max_nr? You mean 'max'?
> 
> It should, corrected.
> 
> > > +  * events we already successfully polled
> > > +  */
> > > + polled = to_poll = 0;
> > > + poll_completed = atomic_read(>poll_completed);
> > > + list_for_each_entry(iocb, >poll_completing, ki_list) {
> > > + /*
> > > +  * Poll for needed events with wait == true, anything after
> > > +  * that we just check if we have more, up to max.
> > > +  */
> > > + bool wait = polled + *nr_events >= min;
> > > + struct kiocb *kiocb = >rw;
> > > +
> > > + if (test_bit(IOCB_POLL_COMPLETED, >ki_flags))
> > > + break;
> > > + if (++to_poll + *nr_events >= max)
> > > + break;
> > > +
> > > + polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);
> > 
> > Could iopoll return a negative value? (Currently not in this patchset,
> > but would it be possible in the future?)
> 
> That's a good point, I've added a separate check for this. Given that
> it's a regular fops handler, it should be perfectly valid to return
> -ERROR.
> 
> > > + if (polled + *nr_events >= max)
> > > + break;
> > > + if (poll_completed != atomic_read(>poll_completed))
> > > + break;
> > > + }
> > > +
> > > + ret = aio_iopoll_reap(ctx, event, nr_events, max);
> > > + if (ret < 0)
> > > + return ret;
> > > + if (*nr_events >= min)
> > > + return 0;
> > > + return to_poll;
> > 
> > What does the returned value mean?
> > If the intention is only to return a value greater than zero,
> > how about just returning to_poll > 0?
> 
> It just means that you could call us again, if > 0, and < 0 is an error
> specifically.
> 
> > > +/*
> > > + * We can't just wait for polled events to come to us, we have to 
> > > actively
> > > + * find and complete them.
> > > + */
> > > +static void aio_iopoll_reap_events(struct kioctx *ctx)
> > > +{
> > > + if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
> > > + return;
> > > +
> > > + while (!list_empty_careful(>poll_submitted) ||
> > > +!list_empty(>poll_completing)) {
> > > + unsigned int nr_events = 0;
> > > +
> > > + __aio_iopoll_check(ctx, NULL, _events, 1, UINT_MAX);
> > 
> > BUG_ON(__aoi_iopoll_check() < 0) ?
> 
> Ho hum...
> 
> > > + }
> > > +}
> > > +
> > > +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
> > > + struct io_event __user *event)
> > > +{
> > > + unsigned int nr_events = 0;
> > > + int ret = 0;
> > > +
> > > + /* * Only allow one thread polling at a time */
> > 
> > nit: extra '* '
> 
> Removed.
> 
> Thanks for your review!

You're welcome. Always happy to help!
Thanks for introducing this interface!
We intend to make 

Re: [PATCH 8/8] aio: support for IO polling

2018-11-21 Thread Jens Axboe
On 11/21/18 4:12 AM, Benny Halevy wrote:
>> +#define AIO_POLL_STACK  8
>> +
>> +/*
>> + * Process completed iocb iopoll entries, copying the result to userspace.
>> + */
>> +static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
>> +unsigned int *nr_events, long max)
>> +{
>> +void *iocbs[AIO_POLL_STACK];
>> +struct aio_kiocb *iocb, *n;
>> +int to_free = 0, ret = 0;
> 
> To be on the safe side, how about checking that if (evs)
> *nr_events < max, otherwise, return -EINVAL?

Good point, I think we should re-arrange the loop a bit to move the
check up at the top to guard for entries == max at entry. I've done
that.

>> +/*
>> + * Take in a new working set from the submitted list if possible.
>> + */
>> +if (!list_empty_careful(>poll_submitted)) {
>> +spin_lock(>poll_lock);
>> +list_splice_init(>poll_submitted, >poll_completing);
>> +spin_unlock(>poll_lock);
>> +}
>> +
>> +if (list_empty(>poll_completing))
>> +return 0;
> 
> Could be somewhat optimized like this:
> 
>   if (list_empty_careful(>poll_submitted))
>   return 0;
> 
>   spin_lock(>poll_lock);
>   list_splice_init(>poll_submitted, >poll_completing);
>   spin_unlock(>poll_lock);
>   if (list_empty(>poll_completing))
>   return 0;
> 
> Or, possibly...
>   if (list_empty_careful(>poll_submitted) ||
>   ({
>   spin_lock(>poll_lock);
>   list_splice_init(>poll_submitted, >poll_completing);
>   spin_unlock(>poll_lock);
>   list_empty(>poll_completing);
>   }))
>   return 0;

I think the readability of the existing version is better.

>> +/*
>> + * Check again now that we have a new batch.
>> + */
>> +ret = aio_iopoll_reap(ctx, event, nr_events, max);
>> +if (ret < 0)
>> +return ret;
>> +if (*nr_events >= min)
>> +return 0;
>> +
>> +/*
>> + * Find up to 'max_nr' worth of events to poll for, including the
> 
> What's max_nr? You mean 'max'?

It should, corrected.

>> + * events we already successfully polled
>> + */
>> +polled = to_poll = 0;
>> +poll_completed = atomic_read(>poll_completed);
>> +list_for_each_entry(iocb, >poll_completing, ki_list) {
>> +/*
>> + * Poll for needed events with wait == true, anything after
>> + * that we just check if we have more, up to max.
>> + */
>> +bool wait = polled + *nr_events >= min;
>> +struct kiocb *kiocb = >rw;
>> +
>> +if (test_bit(IOCB_POLL_COMPLETED, >ki_flags))
>> +break;
>> +if (++to_poll + *nr_events >= max)
>> +break;
>> +
>> +polled += kiocb->ki_filp->f_op->iopoll(kiocb, wait);
> 
> Could iopoll return a negative value? (Currently not in this patchset,
> but would it be possible in the future?)

That's a good point, I've added a separate check for this. Given that
it's a regular fops handler, it should be perfectly valid to return
-ERROR.

>> +if (polled + *nr_events >= max)
>> +break;
>> +if (poll_completed != atomic_read(>poll_completed))
>> +break;
>> +}
>> +
>> +ret = aio_iopoll_reap(ctx, event, nr_events, max);
>> +if (ret < 0)
>> +return ret;
>> +if (*nr_events >= min)
>> +return 0;
>> +return to_poll;
> 
> What does the returned value mean?
> If the intention is only to return a value greater than zero,
> how about just returning to_poll > 0?

It just means that you could call us again, if > 0, and < 0 is an error
specifically.

>> +/*
>> + * We can't just wait for polled events to come to us, we have to actively
>> + * find and complete them.
>> + */
>> +static void aio_iopoll_reap_events(struct kioctx *ctx)
>> +{
>> +if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
>> +return;
>> +
>> +while (!list_empty_careful(>poll_submitted) ||
>> +   !list_empty(>poll_completing)) {
>> +unsigned int nr_events = 0;
>> +
>> +__aio_iopoll_check(ctx, NULL, _events, 1, UINT_MAX);
> 
> BUG_ON(__aoi_iopoll_check() < 0) ?

Ho hum...

>> +}
>> +}
>> +
>> +static int aio_iopoll_check(struct kioctx *ctx, long min_nr, long nr,
>> +struct io_event __user *event)
>> +{
>> +unsigned int nr_events = 0;
>> +int ret = 0;
>> +
>> +/* * Only allow one thread polling at a time */
> 
> nit: extra '* '

Removed.

Thanks for your review!

-- 
Jens Axboe



Re: [PATCH 8/8] aio: support for IO polling

2018-11-21 Thread Benny Halevy
On Tue, 2018-11-20 at 10:19 -0700, Jens Axboe wrote:
> Add polled variants of PREAD/PREADV and PWRITE/PWRITEV. These act
> like their non-polled counterparts, except we expect to poll for
> completion of them. The polling happens at io_getevent() time, and
> works just like non-polled IO.
> 
> To setup an io_context for polled IO, the application must call
> io_setup2() with IOCTX_FLAG_IOPOLL as one of the flags. It is illegal
> to mix and match polled and non-polled IO on an io_context.
> 
> Polled IO doesn't support the user mapped completion ring. Events
> must be reaped through the io_getevents() system call. For non-irq
> driven poll devices, there's no way to support completion reaping
> from userspace by just looking at the ring. The application itself
> is the one that pulls completion entries.
> 
> Signed-off-by: Jens Axboe 
> ---
>  fs/aio.c | 377 ++-
>  include/uapi/linux/aio_abi.h |   4 +
>  2 files changed, 336 insertions(+), 45 deletions(-)
> 
> diff --git a/fs/aio.c b/fs/aio.c
> index 8bbb0b77d9c4..ea93847d25d1 100644
> --- a/fs/aio.c
> +++ b/fs/aio.c
> @@ -94,6 +94,8 @@ struct kioctx {
>  
>   unsigned long   user_id;
>  
> + unsigned intflags;
> +
>   struct __percpu kioctx_cpu *cpu;
>  
>   /*
> @@ -138,6 +140,19 @@ struct kioctx {
>   atomic_treqs_available;
>   } cacheline_aligned_in_smp;
>  
> + /* iopoll submission state */
> + struct {
> + spinlock_t poll_lock;
> + struct list_head poll_submitted;
> + } cacheline_aligned_in_smp;
> +
> + /* iopoll completion state */
> + struct {
> + struct list_head poll_completing;
> + unsigned long getevents_busy;
> + atomic_t poll_completed;
> + } cacheline_aligned_in_smp;
> +
>   struct {
>   spinlock_t  ctx_lock;
>   struct list_head active_reqs;   /* used for cancellation */
> @@ -191,13 +206,24 @@ struct aio_kiocb {
>  
>   struct list_headki_list;/* the aio core uses this
>* for cancellation */
> +
> + unsigned long   ki_flags;
> +#define IOCB_POLL_COMPLETED  0
> +
>   refcount_t  ki_refcnt;
>  
> - /*
> -  * If the aio_resfd field of the userspace iocb is not zero,
> -  * this is the underlying eventfd context to deliver events to.
> -  */
> - struct eventfd_ctx  *ki_eventfd;
> + union {
> + /*
> +  * If the aio_resfd field of the userspace iocb is not zero,
> +  * this is the underlying eventfd context to deliver events to.
> +  */
> + struct eventfd_ctx  *ki_eventfd;
> +
> + /*
> +  * For polled IO, stash completion info here
> +  */
> + struct io_event ki_ev;
> + };
>  };
>  
>  /*-- sysctl variables*/
> @@ -214,6 +240,8 @@ static struct vfsmount *aio_mnt;
>  static const struct file_operations aio_ring_fops;
>  static const struct address_space_operations aio_ctx_aops;
>  
> +static void aio_iopoll_reap_events(struct kioctx *);
> +
>  static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
>  {
>   struct file *file;
> @@ -451,11 +479,15 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned 
> int nr_events)
>   int i;
>   struct file *file;
>  
> - /* Compensate for the ring buffer's head/tail overlap entry */
> - nr_events += 2; /* 1 is required, 2 for good luck */
> -
> + /*
> +  * Compensate for the ring buffer's head/tail overlap entry.
> +  * IO polling doesn't require any io event entries
> +  */
>   size = sizeof(struct aio_ring);
> - size += sizeof(struct io_event) * nr_events;
> + if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
> + nr_events += 2; /* 1 is required, 2 for good luck */
> + size += sizeof(struct io_event) * nr_events;
> + }
>  
>   nr_pages = PFN_UP(size);
>   if (nr_pages < 0)
> @@ -720,6 +752,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events, 
> unsigned int flags)
>   if (!ctx)
>   return ERR_PTR(-ENOMEM);
>  
> + ctx->flags = flags;
>   ctx->max_reqs = max_reqs;
>  
>   spin_lock_init(>ctx_lock);
> @@ -732,6 +765,11 @@ static struct kioctx *ioctx_alloc(unsigned nr_events, 
> unsigned int flags)
>  
>   INIT_LIST_HEAD(>active_reqs);
>  
> + spin_lock_init(>poll_lock);
> + INIT_LIST_HEAD(>poll_submitted);
> + INIT_LIST_HEAD(>poll_completing);
> + atomic_set(>poll_completed, 0);
> +
>   if (percpu_ref_init(>users, free_ioctx_users, 0, GFP_KERNEL))
>   goto err;
>  
> @@ -814,6 +852,8 @@ static int kill_ioctx(struct mm_struct *mm, struct kioctx 
> *ctx,
>   RCU_INIT_POINTER(table->table[ctx->id], NULL);
>   spin_unlock(>ioctx_lock);
>  
> +