Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines

2021-06-18 Thread Ilya Dryomov
On Fri, Jun 18, 2021 at 11:07 AM Peter Lieven  wrote:
>
> Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> > On Wed, May 19, 2021 at 4:27 PM Peter Lieven  wrote:
> >> Signed-off-by: Peter Lieven 
> >> ---
> >>  block/rbd.c | 255 ++--
> >>  1 file changed, 87 insertions(+), 168 deletions(-)
> >>
> >> diff --git a/block/rbd.c b/block/rbd.c
> >> index 97a2ae4c84..0d8612a988 100644
> >> --- a/block/rbd.c
> >> +++ b/block/rbd.c
> >> @@ -66,22 +66,6 @@ typedef enum {
> >>  RBD_AIO_FLUSH
> >>  } RBDAIOCmd;
> >>
> >> -typedef struct RBDAIOCB {
> >> -BlockAIOCB common;
> >> -int64_t ret;
> >> -QEMUIOVector *qiov;
> >> -RBDAIOCmd cmd;
> >> -int error;
> >> -struct BDRVRBDState *s;
> >> -} RBDAIOCB;
> >> -
> >> -typedef struct RADOSCB {
> >> -RBDAIOCB *acb;
> >> -struct BDRVRBDState *s;
> >> -int64_t size;
> >> -int64_t ret;
> >> -} RADOSCB;
> >> -
> >>  typedef struct BDRVRBDState {
> >>  rados_t cluster;
> >>  rados_ioctx_t io_ctx;
> >> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
> >>  uint64_t object_size;
> >>  } BDRVRBDState;
> >>
> >> +typedef struct RBDTask {
> >> +BlockDriverState *bs;
> >> +Coroutine *co;
> >> +bool complete;
> >> +int64_t ret;
> >> +} RBDTask;
> >> +
> >>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
> >>  BlockdevOptionsRbd *opts, bool cache,
> >>  const char *keypairs, const char *secretid,
> >> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, 
> >> const char *keypairs_json,
> >>  return ret;
> >>  }
> >>
> >> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> >> -{
> >> -RBDAIOCB *acb = rcb->acb;
> >> -iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> >> -   acb->qiov->size - offs);
> >> -}
> >> -
> >>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
> >>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
> >>const char *keypairs, const char 
> >> *password_secret,
> >> @@ -450,46 +434,6 @@ exit:
> >>  return ret;
> >>  }
> >>
> >> -/*
> >> - * This aio completion is being called from rbd_finish_bh() and runs in 
> >> qemu
> >> - * BH context.
> >> - */
> >> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> >> -{
> >> -RBDAIOCB *acb = rcb->acb;
> >> -int64_t r;
> >> -
> >> -r = rcb->ret;
> >> -
> >> -if (acb->cmd != RBD_AIO_READ) {
> >> -if (r < 0) {
> >> -acb->ret = r;
> >> -acb->error = 1;
> >> -} else if (!acb->error) {
> >> -acb->ret = rcb->size;
> >> -}
> >> -} else {
> >> -if (r < 0) {
> >> -qemu_rbd_memset(rcb, 0);
> >> -acb->ret = r;
> >> -acb->error = 1;
> >> -} else if (r < rcb->size) {
> >> -qemu_rbd_memset(rcb, r);
> >> -if (!acb->error) {
> >> -acb->ret = rcb->size;
> >> -}
> >> -} else if (!acb->error) {
> >> -acb->ret = r;
> >> -}
> >> -}
> >> -
> >> -g_free(rcb);
> >> -
> >> -acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> >> -
> >> -qemu_aio_unref(acb);
> >> -}
> >> -
> >>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
> >>  {
> >>  const char **vals;
> >> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, 
> >> uint64_t size)
> >>  return 0;
> >>  }
> >>
> >> -static const AIOCBInfo rbd_aiocb_info = {
> >> -.aiocb_size = sizeof(RBDAIOCB),
> >> -};
> >> -
> >> -static void rbd_finish_bh(void *opaque)
> >> +static void qemu_rbd_finish_bh(void *opaque)
> >>  {
> >> -RADOSCB *rcb = opaque;
> >> -qemu_rbd_complete_aio(rcb);
> >> +RBDTask *task = opaque;
> >> +task->complete = 1;
> >> +aio_co_wake(task->co);
> >>  }
> >>
> >> -/*
> >> - * This is the callback function for rbd_aio_read and _write
> >> - *
> >> - * Note: this function is being called from a non qemu thread so
> >> - * we need to be careful about what we do here. Generally we only
> >> - * schedule a BH, and do the rest of the io completion handling
> >> - * from rbd_finish_bh() which runs in a qemu context.
> >> - */
> > I would adapt this comment instead of removing it.  I mean, it is
> > still true and the reason for going through aio_bh_schedule_oneshot()
> > instead of calling aio_co_wake() directly, right?
>
>
> Sure, its still right, but I think rbd is the only driver with this comment.
>
> I can leave it in, no problem.

Yeah, just massage it a bit to reflect the reality (at least the
function name).

>
>
> >
> >> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> >> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
> >>  {
> >> -RBDAIOCB *acb = rcb->acb;
> >> -
> >> -rcb->ret = rbd_aio_get_return_value(c);
> >> + 

Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines

2021-06-18 Thread Peter Lieven
Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> On Wed, May 19, 2021 at 4:27 PM Peter Lieven  wrote:
>> Signed-off-by: Peter Lieven 
>> ---
>>  block/rbd.c | 255 ++--
>>  1 file changed, 87 insertions(+), 168 deletions(-)
>>
>> diff --git a/block/rbd.c b/block/rbd.c
>> index 97a2ae4c84..0d8612a988 100644
>> --- a/block/rbd.c
>> +++ b/block/rbd.c
>> @@ -66,22 +66,6 @@ typedef enum {
>>  RBD_AIO_FLUSH
>>  } RBDAIOCmd;
>>
>> -typedef struct RBDAIOCB {
>> -BlockAIOCB common;
>> -int64_t ret;
>> -QEMUIOVector *qiov;
>> -RBDAIOCmd cmd;
>> -int error;
>> -struct BDRVRBDState *s;
>> -} RBDAIOCB;
>> -
>> -typedef struct RADOSCB {
>> -RBDAIOCB *acb;
>> -struct BDRVRBDState *s;
>> -int64_t size;
>> -int64_t ret;
>> -} RADOSCB;
>> -
>>  typedef struct BDRVRBDState {
>>  rados_t cluster;
>>  rados_ioctx_t io_ctx;
>> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
>>  uint64_t object_size;
>>  } BDRVRBDState;
>>
>> +typedef struct RBDTask {
>> +BlockDriverState *bs;
>> +Coroutine *co;
>> +bool complete;
>> +int64_t ret;
>> +} RBDTask;
>> +
>>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>>  BlockdevOptionsRbd *opts, bool cache,
>>  const char *keypairs, const char *secretid,
>> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const 
>> char *keypairs_json,
>>  return ret;
>>  }
>>
>> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
>> -{
>> -RBDAIOCB *acb = rcb->acb;
>> -iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
>> -   acb->qiov->size - offs);
>> -}
>> -
>>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>>const char *keypairs, const char 
>> *password_secret,
>> @@ -450,46 +434,6 @@ exit:
>>  return ret;
>>  }
>>
>> -/*
>> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
>> - * BH context.
>> - */
>> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
>> -{
>> -RBDAIOCB *acb = rcb->acb;
>> -int64_t r;
>> -
>> -r = rcb->ret;
>> -
>> -if (acb->cmd != RBD_AIO_READ) {
>> -if (r < 0) {
>> -acb->ret = r;
>> -acb->error = 1;
>> -} else if (!acb->error) {
>> -acb->ret = rcb->size;
>> -}
>> -} else {
>> -if (r < 0) {
>> -qemu_rbd_memset(rcb, 0);
>> -acb->ret = r;
>> -acb->error = 1;
>> -} else if (r < rcb->size) {
>> -qemu_rbd_memset(rcb, r);
>> -if (!acb->error) {
>> -acb->ret = rcb->size;
>> -}
>> -} else if (!acb->error) {
>> -acb->ret = r;
>> -}
>> -}
>> -
>> -g_free(rcb);
>> -
>> -acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> -
>> -qemu_aio_unref(acb);
>> -}
>> -
>>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>>  {
>>  const char **vals;
>> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, 
>> uint64_t size)
>>  return 0;
>>  }
>>
>> -static const AIOCBInfo rbd_aiocb_info = {
>> -.aiocb_size = sizeof(RBDAIOCB),
>> -};
>> -
>> -static void rbd_finish_bh(void *opaque)
>> +static void qemu_rbd_finish_bh(void *opaque)
>>  {
>> -RADOSCB *rcb = opaque;
>> -qemu_rbd_complete_aio(rcb);
>> +RBDTask *task = opaque;
>> +task->complete = 1;
>> +aio_co_wake(task->co);
>>  }
>>
>> -/*
>> - * This is the callback function for rbd_aio_read and _write
>> - *
>> - * Note: this function is being called from a non qemu thread so
>> - * we need to be careful about what we do here. Generally we only
>> - * schedule a BH, and do the rest of the io completion handling
>> - * from rbd_finish_bh() which runs in a qemu context.
>> - */
> I would adapt this comment instead of removing it.  I mean, it is
> still true and the reason for going through aio_bh_schedule_oneshot()
> instead of calling aio_co_wake() directly, right?


Sure, its still right, but I think rbd is the only driver with this comment.

I can leave it in, no problem.


>
>> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
>> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>>  {
>> -RBDAIOCB *acb = rcb->acb;
>> -
>> -rcb->ret = rbd_aio_get_return_value(c);
>> +task->ret = rbd_aio_get_return_value(c);
>>  rbd_aio_release(c);
>> -
>> -replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
>> - rbd_finish_bh, rcb);
>> +aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
>> +qemu_rbd_finish_bh, task);
>>  }
>>
>> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>> - 

Re: [PATCH V3 4/6] block/rbd: migrate from aio to coroutines

2021-06-17 Thread Ilya Dryomov
On Wed, May 19, 2021 at 4:27 PM Peter Lieven  wrote:
>
> Signed-off-by: Peter Lieven 
> ---
>  block/rbd.c | 255 ++--
>  1 file changed, 87 insertions(+), 168 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 97a2ae4c84..0d8612a988 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -66,22 +66,6 @@ typedef enum {
>  RBD_AIO_FLUSH
>  } RBDAIOCmd;
>
> -typedef struct RBDAIOCB {
> -BlockAIOCB common;
> -int64_t ret;
> -QEMUIOVector *qiov;
> -RBDAIOCmd cmd;
> -int error;
> -struct BDRVRBDState *s;
> -} RBDAIOCB;
> -
> -typedef struct RADOSCB {
> -RBDAIOCB *acb;
> -struct BDRVRBDState *s;
> -int64_t size;
> -int64_t ret;
> -} RADOSCB;
> -
>  typedef struct BDRVRBDState {
>  rados_t cluster;
>  rados_ioctx_t io_ctx;
> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
>  uint64_t object_size;
>  } BDRVRBDState;
>
> +typedef struct RBDTask {
> +BlockDriverState *bs;
> +Coroutine *co;
> +bool complete;
> +int64_t ret;
> +} RBDTask;
> +
>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>  BlockdevOptionsRbd *opts, bool cache,
>  const char *keypairs, const char *secretid,
> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const 
> char *keypairs_json,
>  return ret;
>  }
>
> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> -{
> -RBDAIOCB *acb = rcb->acb;
> -iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> -   acb->qiov->size - offs);
> -}
> -
>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>const char *keypairs, const char 
> *password_secret,
> @@ -450,46 +434,6 @@ exit:
>  return ret;
>  }
>
> -/*
> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
> - * BH context.
> - */
> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> -{
> -RBDAIOCB *acb = rcb->acb;
> -int64_t r;
> -
> -r = rcb->ret;
> -
> -if (acb->cmd != RBD_AIO_READ) {
> -if (r < 0) {
> -acb->ret = r;
> -acb->error = 1;
> -} else if (!acb->error) {
> -acb->ret = rcb->size;
> -}
> -} else {
> -if (r < 0) {
> -qemu_rbd_memset(rcb, 0);
> -acb->ret = r;
> -acb->error = 1;
> -} else if (r < rcb->size) {
> -qemu_rbd_memset(rcb, r);
> -if (!acb->error) {
> -acb->ret = rcb->size;
> -}
> -} else if (!acb->error) {
> -acb->ret = r;
> -}
> -}
> -
> -g_free(rcb);
> -
> -acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> -
> -qemu_aio_unref(acb);
> -}
> -
>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>  {
>  const char **vals;
> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, 
> uint64_t size)
>  return 0;
>  }
>
> -static const AIOCBInfo rbd_aiocb_info = {
> -.aiocb_size = sizeof(RBDAIOCB),
> -};
> -
> -static void rbd_finish_bh(void *opaque)
> +static void qemu_rbd_finish_bh(void *opaque)
>  {
> -RADOSCB *rcb = opaque;
> -qemu_rbd_complete_aio(rcb);
> +RBDTask *task = opaque;
> +task->complete = 1;
> +aio_co_wake(task->co);
>  }
>
> -/*
> - * This is the callback function for rbd_aio_read and _write
> - *
> - * Note: this function is being called from a non qemu thread so
> - * we need to be careful about what we do here. Generally we only
> - * schedule a BH, and do the rest of the io completion handling
> - * from rbd_finish_bh() which runs in a qemu context.
> - */

I would adapt this comment instead of removing it.  I mean, it is
still true and the reason for going through aio_bh_schedule_oneshot()
instead of calling aio_co_wake() directly, right?

> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>  {
> -RBDAIOCB *acb = rcb->acb;
> -
> -rcb->ret = rbd_aio_get_return_value(c);
> +task->ret = rbd_aio_get_return_value(c);
>  rbd_aio_release(c);
> -
> -replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
> - rbd_finish_bh, rcb);
> +aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
> +qemu_rbd_finish_bh, task);
>  }
>
> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> - int64_t off,
> - QEMUIOVector *qiov,
> - int64_t size,
> - BlockCompletionFunc *cb,
> - void *opaque,
> - RBDAIOCmd cmd)
> +static int coroutine_fn 

[PATCH V3 4/6] block/rbd: migrate from aio to coroutines

2021-05-19 Thread Peter Lieven
Signed-off-by: Peter Lieven 
---
 block/rbd.c | 255 ++--
 1 file changed, 87 insertions(+), 168 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index 97a2ae4c84..0d8612a988 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -66,22 +66,6 @@ typedef enum {
 RBD_AIO_FLUSH
 } RBDAIOCmd;
 
-typedef struct RBDAIOCB {
-BlockAIOCB common;
-int64_t ret;
-QEMUIOVector *qiov;
-RBDAIOCmd cmd;
-int error;
-struct BDRVRBDState *s;
-} RBDAIOCB;
-
-typedef struct RADOSCB {
-RBDAIOCB *acb;
-struct BDRVRBDState *s;
-int64_t size;
-int64_t ret;
-} RADOSCB;
-
 typedef struct BDRVRBDState {
 rados_t cluster;
 rados_ioctx_t io_ctx;
@@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
 uint64_t object_size;
 } BDRVRBDState;
 
+typedef struct RBDTask {
+BlockDriverState *bs;
+Coroutine *co;
+bool complete;
+int64_t ret;
+} RBDTask;
+
 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
 BlockdevOptionsRbd *opts, bool cache,
 const char *keypairs, const char *secretid,
@@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const 
char *keypairs_json,
 return ret;
 }
 
-static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
-{
-RBDAIOCB *acb = rcb->acb;
-iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
-   acb->qiov->size - offs);
-}
-
 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
   const char *keypairs, const char 
*password_secret,
@@ -450,46 +434,6 @@ exit:
 return ret;
 }
 
-/*
- * This aio completion is being called from rbd_finish_bh() and runs in qemu
- * BH context.
- */
-static void qemu_rbd_complete_aio(RADOSCB *rcb)
-{
-RBDAIOCB *acb = rcb->acb;
-int64_t r;
-
-r = rcb->ret;
-
-if (acb->cmd != RBD_AIO_READ) {
-if (r < 0) {
-acb->ret = r;
-acb->error = 1;
-} else if (!acb->error) {
-acb->ret = rcb->size;
-}
-} else {
-if (r < 0) {
-qemu_rbd_memset(rcb, 0);
-acb->ret = r;
-acb->error = 1;
-} else if (r < rcb->size) {
-qemu_rbd_memset(rcb, r);
-if (!acb->error) {
-acb->ret = rcb->size;
-}
-} else if (!acb->error) {
-acb->ret = r;
-}
-}
-
-g_free(rcb);
-
-acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-
-qemu_aio_unref(acb);
-}
-
 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 {
 const char **vals;
@@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, uint64_t 
size)
 return 0;
 }
 
-static const AIOCBInfo rbd_aiocb_info = {
-.aiocb_size = sizeof(RBDAIOCB),
-};
-
-static void rbd_finish_bh(void *opaque)
+static void qemu_rbd_finish_bh(void *opaque)
 {
-RADOSCB *rcb = opaque;
-qemu_rbd_complete_aio(rcb);
+RBDTask *task = opaque;
+task->complete = 1;
+aio_co_wake(task->co);
 }
 
-/*
- * This is the callback function for rbd_aio_read and _write
- *
- * Note: this function is being called from a non qemu thread so
- * we need to be careful about what we do here. Generally we only
- * schedule a BH, and do the rest of the io completion handling
- * from rbd_finish_bh() which runs in a qemu context.
- */
-static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
+static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
 {
-RBDAIOCB *acb = rcb->acb;
-
-rcb->ret = rbd_aio_get_return_value(c);
+task->ret = rbd_aio_get_return_value(c);
 rbd_aio_release(c);
-
-replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
- rbd_finish_bh, rcb);
+aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
+qemu_rbd_finish_bh, task);
 }
 
-static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
- int64_t off,
- QEMUIOVector *qiov,
- int64_t size,
- BlockCompletionFunc *cb,
- void *opaque,
- RBDAIOCmd cmd)
+static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
+  uint64_t offset,
+  uint64_t bytes,
+  QEMUIOVector *qiov,
+  int flags,
+  RBDAIOCmd cmd)
 {
-RBDAIOCB *acb;
-RADOSCB *rcb = NULL;
+BDRVRBDState *s = bs->opaque;
+RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
 rbd_completion_t c;
 int r;
 
-BDRVRBDState *s = bs->opaque;
+