[Qemu-devel] [RFC v6] RBD: Add support readv,writev for rbd

2017-02-20 Thread jazeltq
From: tianqing 

Rbd can do readv and writev directly, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 80 ++---
 1 file changed, 56 insertions(+), 24 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..22e8e69 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -62,6 +62,13 @@
 #define RBD_MAX_SNAP_NAME_SIZE 128
 #define RBD_MAX_SNAPS 100
 
+/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
+#ifdef LIBRBD_SUPPORTS_IOVEC
+#define LIBRBD_USE_IOVEC 1
+#else
+#define LIBRBD_USE_IOVEC 0
+#endif
+
 typedef enum {
 RBD_AIO_READ,
 RBD_AIO_WRITE,
@@ -310,6 +317,17 @@ static int qemu_rbd_set_conf(rados_t cluster, const char 
*conf,
 return ret;
 }
 
+static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
+{
+if (LIBRBD_USE_IOVEC) {
+RBDAIOCB *acb = rcb->acb;
+iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
+   acb->qiov->size - offs);
+} else {
+memset(rcb->buf + offs, 0, rcb->size - offs);
+}
+}
+
 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 {
 Error *local_err = NULL;
@@ -426,11 +444,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 }
 } else {
 if (r < 0) {
-memset(rcb->buf, 0, rcb->size);
+qemu_rbd_memset(rcb, 0);
 acb->ret = r;
 acb->error = 1;
 } else if (r < rcb->size) {
-memset(rcb->buf + r, 0, rcb->size - r);
+qemu_rbd_memset(rcb, r);
 if (!acb->error) {
 acb->ret = rcb->size;
 }
@@ -441,10 +459,13 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 g_free(rcb);
 
-if (acb->cmd == RBD_AIO_READ) {
-qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+if (!LIBRBD_USE_IOVEC) {
+if (acb->cmd == RBD_AIO_READ) {
+qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+}
+qemu_vfree(acb->bounce);
 }
-qemu_vfree(acb->bounce);
+
 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
 qemu_aio_unref(acb);
@@ -655,7 +676,6 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 RBDAIOCB *acb;
 RADOSCB *rcb = NULL;
 rbd_completion_t c;
-char *buf;
 int r;
 
 BDRVRBDState *s = bs->opaque;
@@ -664,27 +684,29 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 acb->cmd = cmd;
 acb->qiov = qiov;
 assert(!qiov || qiov->size == size);
-if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
-acb->bounce = NULL;
-} else {
-acb->bounce = qemu_try_blockalign(bs, qiov->size);
-if (acb->bounce == NULL) {
-goto failed;
+
+rcb = g_new(RADOSCB, 1);
+
+if (!LIBRBD_USE_IOVEC) {
+if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
+acb->bounce = NULL;
+} else {
+acb->bounce = qemu_try_blockalign(bs, qiov->size);
+if (acb->bounce == NULL) {
+goto failed;
+}
 }
+if (cmd == RBD_AIO_WRITE) {
+qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
+}
+rcb->buf = acb->bounce;
 }
+
 acb->ret = 0;
 acb->error = 0;
 acb->s = s;
 
-if (cmd == RBD_AIO_WRITE) {
-qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
-}
-
-buf = acb->bounce;
-
-rcb = g_new(RADOSCB, 1);
 rcb->acb = acb;
-rcb->buf = buf;
 rcb->s = acb->s;
 rcb->size = size;
 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
@@ -694,10 +716,18 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
 switch (cmd) {
 case RBD_AIO_WRITE:
-r = rbd_aio_write(s->image, off, size, buf, c);
+#ifdef LIBRBD_SUPPORTS_IOVEC
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+#else
+r = rbd_aio_write(s->image, off, size, rcb->buf, c);
+#endif
 break;
 case RBD_AIO_READ:
-r = rbd_aio_read(s->image, off, size, buf, c);
+#ifdef LIBRBD_SUPPORTS_IOVEC
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+#else
+r = rbd_aio_read(s->image, off, size, rcb->buf, c);
+#endif
 break;
 case RBD_AIO_DISCARD:
 r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -712,14 +742,16 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 if (r < 0) {
 goto failed_completion;
 }
-
 return >common;
 
 failed_completion:
 rbd_aio_release(c);
 failed:
 g_free(rcb);
-qemu_vfree(acb->bounce);
+if (!LIBRBD_USE_IOVEC) {
+qemu_vfree(acb->bounce);
+}
+
 qemu_aio_unref(acb);
 return NULL;
 }
-- 
2.10.2




[Qemu-devel] [RFC v6] RBD: Add support readv,writev for rbd

2017-02-20 Thread jazeltq
From: tianqing 

Rbd can do readv and writev directly, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 79 ++---
 1 file changed, 54 insertions(+), 25 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..5373680 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -62,6 +62,13 @@
 #define RBD_MAX_SNAP_NAME_SIZE 128
 #define RBD_MAX_SNAPS 100
 
+/* The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h */
+#ifdef LIBRBD_SUPPORTS_IOVEC
+#define LIBRBD_USE_IOVEC 1
+#else
+#define LIBRBD_USE_IOVEC 0
+#endif
+
 typedef enum {
 RBD_AIO_READ,
 RBD_AIO_WRITE,
@@ -310,6 +317,17 @@ static int qemu_rbd_set_conf(rados_t cluster, const char 
*conf,
 return ret;
 }
 
+static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
+{
+if(LIBRBD_USE_IOVEC){
+RBDAIOCB *acb = rcb->acb;
+iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
+   acb->qiov->size - offs);
+} else {
+memset(rcb->buf + offs, 0, rcb->size - offs);
+}
+}
+
 static int qemu_rbd_create(const char *filename, QemuOpts *opts, Error **errp)
 {
 Error *local_err = NULL;
@@ -426,11 +444,11 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 }
 } else {
 if (r < 0) {
-memset(rcb->buf, 0, rcb->size);
+qemu_rbd_memset(rcb, 0);
 acb->ret = r;
 acb->error = 1;
 } else if (r < rcb->size) {
-memset(rcb->buf + r, 0, rcb->size - r);
+qemu_rbd_memset(rcb, r);
 if (!acb->error) {
 acb->ret = rcb->size;
 }
@@ -440,11 +458,14 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 }
 
 g_free(rcb);
-
-if (acb->cmd == RBD_AIO_READ) {
-qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+ 
+if(!LIBRBD_USE_IOVEC){
+ if (acb->cmd == RBD_AIO_READ) {
+ qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
+ }
+ qemu_vfree(acb->bounce);
 }
-qemu_vfree(acb->bounce);
+
 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
 qemu_aio_unref(acb);
@@ -655,7 +676,6 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 RBDAIOCB *acb;
 RADOSCB *rcb = NULL;
 rbd_completion_t c;
-char *buf;
 int r;
 
 BDRVRBDState *s = bs->opaque;
@@ -664,27 +684,29 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 acb->cmd = cmd;
 acb->qiov = qiov;
 assert(!qiov || qiov->size == size);
-if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
-acb->bounce = NULL;
-} else {
-acb->bounce = qemu_try_blockalign(bs, qiov->size);
-if (acb->bounce == NULL) {
-goto failed;
+
+rcb = g_new(RADOSCB, 1);
+
+if(!LIBRBD_USE_IOVEC){
+if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
+acb->bounce = NULL;
+} else {
+acb->bounce = qemu_try_blockalign(bs, qiov->size);
+if (acb->bounce == NULL) {
+goto failed;
+}
 }
+if (cmd == RBD_AIO_WRITE) {
+qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
+}
+rcb->buf = acb->bounce;
 }
+
 acb->ret = 0;
 acb->error = 0;
 acb->s = s;
 
-if (cmd == RBD_AIO_WRITE) {
-qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
-}
-
-buf = acb->bounce;
-
-rcb = g_new(RADOSCB, 1);
 rcb->acb = acb;
-rcb->buf = buf;
 rcb->s = acb->s;
 rcb->size = size;
 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
@@ -694,10 +716,16 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
 switch (cmd) {
 case RBD_AIO_WRITE:
-r = rbd_aio_write(s->image, off, size, buf, c);
+if(!LIBRBD_USE_IOVEC)
+r = rbd_aio_write(s->image, off, size, rcb->buf, c);
+else
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
 break;
 case RBD_AIO_READ:
-r = rbd_aio_read(s->image, off, size, buf, c);
+if(!LIBRBD_USE_IOVEC)
+r = rbd_aio_read(s->image, off, size, rcb->buf, c);
+else
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
 break;
 case RBD_AIO_DISCARD:
 r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -712,14 +740,15 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 if (r < 0) {
 goto failed_completion;
 }
-
 return >common;
 
 failed_completion:
 rbd_aio_release(c);
 failed:
 g_free(rcb);
-qemu_vfree(acb->bounce);
+if(!LIBRBD_USE_IOVEC)
+qemu_vfree(acb->bounce);
+
 qemu_aio_unref(acb);
 return NULL;
 }
-- 
2.10.2




[Qemu-devel] [RFC v4] RBD: Add support readv,writev for rbd

2017-02-16 Thread jazeltq
From: tianqing 

Rbd can do readv and writev directly, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 47 +--
 1 file changed, 41 insertions(+), 6 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..0cbd9e3 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -73,7 +73,12 @@ typedef struct RBDAIOCB {
 BlockAIOCB common;
 int64_t ret;
 QEMUIOVector *qiov;
+/* Note:
+ * The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h.
+ */
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *bounce;
+#endif
 RBDAIOCmd cmd;
 int error;
 struct BDRVRBDState *s;
@@ -83,7 +88,9 @@ typedef struct RADOSCB {
 RBDAIOCB *acb;
 struct BDRVRBDState *s;
 int64_t size;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *buf;
+#endif
 int64_t ret;
 } RADOSCB;
 
@@ -426,11 +433,21 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 }
 } else {
 if (r < 0) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf, 0, rcb->size);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov, 0, 0, acb->qiov->size);
+#endif
 acb->ret = r;
 acb->error = 1;
 } else if (r < rcb->size) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf + r, 0, rcb->size - r);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov,
+   r, 0, acb->qiov->size - r);
+#endif
+
 if (!acb->error) {
 acb->ret = rcb->size;
 }
@@ -441,10 +458,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 g_free(rcb);
 
+#ifndef LIBRBD_SUPPORTS_IOVEC
 if (acb->cmd == RBD_AIO_READ) {
 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 }
 qemu_vfree(acb->bounce);
+#endif
 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
 qemu_aio_unref(acb);
@@ -655,8 +674,10 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 RBDAIOCB *acb;
 RADOSCB *rcb = NULL;
 rbd_completion_t c;
-char *buf;
 int r;
+#ifndef LIBRBD_SUPPORTS_IOVEC
+char *buf = NULL;
+#endif
 
 BDRVRBDState *s = bs->opaque;
 
@@ -664,6 +685,8 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 acb->cmd = cmd;
 acb->qiov = qiov;
 assert(!qiov || qiov->size == size);
+#ifndef LIBRBD_SUPPORTS_IOVEC
+
 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 acb->bounce = NULL;
 } else {
@@ -672,19 +695,21 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 goto failed;
 }
 }
-acb->ret = 0;
-acb->error = 0;
-acb->s = s;
-
 if (cmd == RBD_AIO_WRITE) {
 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 }
-
 buf = acb->bounce;
+#endif
+acb->ret = 0;
+acb->error = 0;
+acb->s = s;
 
 rcb = g_new(RADOSCB, 1);
+
 rcb->acb = acb;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 rcb->buf = buf;
+#endif
 rcb->s = acb->s;
 rcb->size = size;
 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
@@ -694,10 +719,18 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
 switch (cmd) {
 case RBD_AIO_WRITE:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_write(s->image, off, size, buf, c);
+#else
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_READ:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_read(s->image, off, size, buf, c);
+#else
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_DISCARD:
 r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -719,7 +752,9 @@ failed_completion:
 rbd_aio_release(c);
 failed:
 g_free(rcb);
+#ifndef LIBRBD_SUPPORTS_IOVEC
 qemu_vfree(acb->bounce);
+#endif
 qemu_aio_unref(acb);
 return NULL;
 }
-- 
2.10.2




[Qemu-devel] [RFC v4] RBD: Add support readv,writev for rbd

2017-01-21 Thread jazeltq
From: tianqing 

Rbd can do readv and writev directly, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 47 +--
 1 file changed, 41 insertions(+), 6 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..0cbd9e3 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -73,7 +73,12 @@ typedef struct RBDAIOCB {
 BlockAIOCB common;
 int64_t ret;
 QEMUIOVector *qiov;
+/* Note:
+ * The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h.
+ */
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *bounce;
+#endif
 RBDAIOCmd cmd;
 int error;
 struct BDRVRBDState *s;
@@ -83,7 +88,9 @@ typedef struct RADOSCB {
 RBDAIOCB *acb;
 struct BDRVRBDState *s;
 int64_t size;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *buf;
+#endif
 int64_t ret;
 } RADOSCB;
 
@@ -426,11 +433,21 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 }
 } else {
 if (r < 0) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf, 0, rcb->size);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov, 0, 0, acb->qiov->size);
+#endif
 acb->ret = r;
 acb->error = 1;
 } else if (r < rcb->size) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf + r, 0, rcb->size - r);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov,
+   r, 0, acb->qiov->size - r);
+#endif
+
 if (!acb->error) {
 acb->ret = rcb->size;
 }
@@ -441,10 +458,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 g_free(rcb);
 
+#ifndef LIBRBD_SUPPORTS_IOVEC
 if (acb->cmd == RBD_AIO_READ) {
 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 }
 qemu_vfree(acb->bounce);
+#endif
 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
 qemu_aio_unref(acb);
@@ -655,8 +674,10 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 RBDAIOCB *acb;
 RADOSCB *rcb = NULL;
 rbd_completion_t c;
-char *buf;
 int r;
+#ifndef LIBRBD_SUPPORTS_IOVEC
+char *buf = NULL;
+#endif
 
 BDRVRBDState *s = bs->opaque;
 
@@ -664,6 +685,8 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 acb->cmd = cmd;
 acb->qiov = qiov;
 assert(!qiov || qiov->size == size);
+#ifndef LIBRBD_SUPPORTS_IOVEC
+
 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 acb->bounce = NULL;
 } else {
@@ -672,19 +695,21 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 goto failed;
 }
 }
-acb->ret = 0;
-acb->error = 0;
-acb->s = s;
-
 if (cmd == RBD_AIO_WRITE) {
 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 }
-
 buf = acb->bounce;
+#endif
+acb->ret = 0;
+acb->error = 0;
+acb->s = s;
 
 rcb = g_new(RADOSCB, 1);
+
 rcb->acb = acb;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 rcb->buf = buf;
+#endif
 rcb->s = acb->s;
 rcb->size = size;
 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
@@ -694,10 +719,18 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
 switch (cmd) {
 case RBD_AIO_WRITE:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_write(s->image, off, size, buf, c);
+#else
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_READ:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_read(s->image, off, size, buf, c);
+#else
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_DISCARD:
 r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -719,7 +752,9 @@ failed_completion:
 rbd_aio_release(c);
 failed:
 g_free(rcb);
+#ifndef LIBRBD_SUPPORTS_IOVEC
 qemu_vfree(acb->bounce);
+#endif
 qemu_aio_unref(acb);
 return NULL;
 }
-- 
2.10.2




[Qemu-devel] [RFC v3] RBD: Add support readv,writev for rbd

2017-01-21 Thread jazeltq
From: tianqing 

Rbd can do readv and writev directly, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 47 +--
 1 file changed, 41 insertions(+), 6 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..4add4dd 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -73,7 +73,12 @@ typedef struct RBDAIOCB {
 BlockAIOCB common;
 int64_t ret;
 QEMUIOVector *qiov;
+/* Note:
+ * The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h.
+ */
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *bounce;
+#endif
 RBDAIOCmd cmd;
 int error;
 struct BDRVRBDState *s;
@@ -83,7 +88,9 @@ typedef struct RADOSCB {
 RBDAIOCB *acb;
 struct BDRVRBDState *s;
 int64_t size;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *buf;
+#endif
 int64_t ret;
 } RADOSCB;
 
@@ -426,11 +433,21 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 }
 } else {
 if (r < 0) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf, 0, rcb->size);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov, 0, 0, acb->qiov->size);
+#endif
 acb->ret = r;
 acb->error = 1;
 } else if (r < rcb->size) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf + r, 0, rcb->size - r);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov,
+   r, 0, acb->qiov->size - r);
+#endif
+
 if (!acb->error) {
 acb->ret = rcb->size;
 }
@@ -441,10 +458,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 g_free(rcb);
 
+#ifndef LIBRBD_SUPPORTS_IOVEC
 if (acb->cmd == RBD_AIO_READ) {
 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 }
 qemu_vfree(acb->bounce);
+#endif
 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
 qemu_aio_unref(acb);
@@ -655,8 +674,10 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 RBDAIOCB *acb;
 RADOSCB *rcb = NULL;
 rbd_completion_t c;
-char *buf;
 int r;
+#ifndef LIBRBD_SUPPORTS_IOVEC
+char * buf = NULL; 
+#endif
 
 BDRVRBDState *s = bs->opaque;
 
@@ -664,6 +685,8 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 acb->cmd = cmd;
 acb->qiov = qiov;
 assert(!qiov || qiov->size == size);
+#ifndef LIBRBD_SUPPORTS_IOVEC
+
 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 acb->bounce = NULL;
 } else {
@@ -672,19 +695,21 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 goto failed;
 }
 }
-acb->ret = 0;
-acb->error = 0;
-acb->s = s;
-
 if (cmd == RBD_AIO_WRITE) {
 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 }
-
 buf = acb->bounce;
+#endif
+acb->ret = 0;
+acb->error = 0;
+acb->s = s;
 
 rcb = g_new(RADOSCB, 1);
+
 rcb->acb = acb;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 rcb->buf = buf;
+#endif
 rcb->s = acb->s;
 rcb->size = size;
 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
@@ -694,10 +719,18 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
 switch (cmd) {
 case RBD_AIO_WRITE:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_write(s->image, off, size, buf, c);
+#else
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_READ:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_read(s->image, off, size, buf, c);
+#else
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_DISCARD:
 r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -719,7 +752,9 @@ failed_completion:
 rbd_aio_release(c);
 failed:
 g_free(rcb);
+#ifndef LIBRBD_SUPPORTS_IOVEC
 qemu_vfree(acb->bounce);
+#endif
 qemu_aio_unref(acb);
 return NULL;
 }
-- 
1.8.3.1




[Qemu-devel] [RFC v2] RBD: Add support readv,writev for rbd

2016-11-07 Thread jazeltq
From: tianqing 

Rbd can do readv and writev directly, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 41 -
 1 file changed, 36 insertions(+), 5 deletions(-)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..93fe299 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -73,7 +73,12 @@ typedef struct RBDAIOCB {
 BlockAIOCB common;
 int64_t ret;
 QEMUIOVector *qiov;
+/* Note:
+ * The LIBRBD_SUPPORTS_IOVEC is defined in librbd.h.
+ */
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *bounce;
+#endif
 RBDAIOCmd cmd;
 int error;
 struct BDRVRBDState *s;
@@ -83,7 +88,9 @@ typedef struct RADOSCB {
 RBDAIOCB *acb;
 struct BDRVRBDState *s;
 int64_t size;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 char *buf;
+#endif
 int64_t ret;
 } RADOSCB;
 
@@ -426,11 +433,21 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 }
 } else {
 if (r < 0) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf, 0, rcb->size);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov, 0, 0, acb->qiov->size);
+#endif
 acb->ret = r;
 acb->error = 1;
 } else if (r < rcb->size) {
+#ifndef LIBRBD_SUPPORTS_IOVEC
 memset(rcb->buf + r, 0, rcb->size - r);
+#else
+iov_memset(acb->qiov->iov, acb->qiov->niov,
+   r, 0, acb->qiov->size - r);
+#endif
+
 if (!acb->error) {
 acb->ret = rcb->size;
 }
@@ -441,10 +458,12 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 g_free(rcb);
 
+#ifndef LIBRBD_SUPPORTS_IOVEC
 if (acb->cmd == RBD_AIO_READ) {
 qemu_iovec_from_buf(acb->qiov, 0, acb->bounce, acb->qiov->size);
 }
 qemu_vfree(acb->bounce);
+#endif
 acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
 
 qemu_aio_unref(acb);
@@ -664,6 +683,7 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 acb->cmd = cmd;
 acb->qiov = qiov;
 assert(!qiov || qiov->size == size);
+#ifndef LIBRBD_SUPPORTS_IOVEC
 if (cmd == RBD_AIO_DISCARD || cmd == RBD_AIO_FLUSH) {
 acb->bounce = NULL;
 } else {
@@ -672,19 +692,20 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 goto failed;
 }
 }
-acb->ret = 0;
-acb->error = 0;
-acb->s = s;
-
 if (cmd == RBD_AIO_WRITE) {
 qemu_iovec_to_buf(acb->qiov, 0, acb->bounce, qiov->size);
 }
-
 buf = acb->bounce;
+#endif
+acb->ret = 0;
+acb->error = 0;
+acb->s = s;
 
 rcb = g_new(RADOSCB, 1);
 rcb->acb = acb;
+#ifndef LIBRBD_SUPPORTS_IOVEC
 rcb->buf = buf;
+#endif
 rcb->s = acb->s;
 rcb->size = size;
 r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
@@ -694,10 +715,18 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
 
 switch (cmd) {
 case RBD_AIO_WRITE:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_write(s->image, off, size, buf, c);
+#else
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_READ:
+#ifndef LIBRBD_SUPPORTS_IOVEC
 r = rbd_aio_read(s->image, off, size, buf, c);
+#else
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+#endif
 break;
 case RBD_AIO_DISCARD:
 r = rbd_aio_discard_wrapper(s->image, off, size, c);
@@ -719,7 +748,9 @@ failed_completion:
 rbd_aio_release(c);
 failed:
 g_free(rcb);
+#ifndef LIBRBD_SUPPORTS_IOVEC
 qemu_vfree(acb->bounce);
+#endif
 qemu_aio_unref(acb);
 return NULL;
 }
-- 
2.10.2




[Qemu-devel] [PATCH] RBD: Add support readv,writev for rbd

2016-11-04 Thread jazeltq
From: tianqing 

Rbd can do readv and writev directly, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 124 
 1 file changed, 124 insertions(+)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..a405c02 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -53,6 +53,13 @@
 #undef LIBRBD_SUPPORTS_DISCARD
 #endif
 
+/* rbd_aio_readv, rbd_aio_writev added in 0.1.11 */
+#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 11)
+#define LIBRBD_SUPPORTS_IOV
+#else
+#undef LIBRBD_SUPPORTS_IOV
+#endif
+
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
 #define RBD_MAX_CONF_NAME_SIZE 128
@@ -73,7 +80,10 @@ typedef struct RBDAIOCB {
 BlockAIOCB common;
 int64_t ret;
 QEMUIOVector *qiov;
+#ifdef LIBRBD_SUPPORTS_IOV
+#else
 char *bounce;
+#endif
 RBDAIOCmd cmd;
 int error;
 struct BDRVRBDState *s;
@@ -83,7 +93,10 @@ typedef struct RADOSCB {
 RBDAIOCB *acb;
 struct BDRVRBDState *s;
 int64_t size;
+#ifdef LIBRBD_SUPPORTS_IOV
+#else
 char *buf;
+#endif
 int64_t ret;
 } RADOSCB;
 
@@ -406,6 +419,48 @@ shutdown:
 return ret;
 }
 
+
+#ifdef LIBRBD_SUPPORTS_IOV
+/*
+ * This aio completion is being called from rbd_finish_bh() and runs in qemu
+ * BH context.
+ */
+static void qemu_rbd_complete_aio(RADOSCB *rcb)
+{
+RBDAIOCB *acb = rcb->acb;
+int64_t r;
+
+r = rcb->ret;
+
+if (acb->cmd != RBD_AIO_READ) {
+if (r < 0) {
+acb->ret = r;
+acb->error = 1;
+} else if (!acb->error) {
+acb->ret = rcb->size;
+}
+} else {
+if (r < 0) {
+iov_memset(acb->qiov->iov, acb->qiov->niov, 0, 0, acb->qiov->size);
+acb->ret = r;
+acb->error = 1;
+} else if (r < rcb->size) {
+iov_memset(acb->qiov->iov, acb->qiov->niov,
+   rcb->size - r, 0, acb->qiov->size);
+if (!acb->error) {
+acb->ret = rcb->size;
+}
+} else if (!acb->error) {
+acb->ret = r;
+}
+}
+
+g_free(rcb);
+acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+
+qemu_aio_unref(acb);
+}
+#else
 /*
  * This aio completion is being called from rbd_finish_bh() and runs in qemu
  * BH context.
@@ -449,6 +504,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 qemu_aio_unref(acb);
 }
+#endif
 
 /* TODO Convert to fine grained options */
 static QemuOptsList runtime_opts = {
@@ -644,6 +700,73 @@ static int rbd_aio_flush_wrapper(rbd_image_t image,
 #endif
 }
 
+#ifdef LIBRBD_SUPPORTS_IOV
+static BlockAIOCB *rbd_start_aio_vec(BlockDriverState *bs,
+ int64_t off,
+ QEMUIOVector *qiov,
+ int64_t size,
+ BlockCompletionFunc *cb,
+ void *opaque,
+ RBDAIOCmd cmd)
+{
+RBDAIOCB *acb;
+RADOSCB *rcb = NULL;
+rbd_completion_t c;
+int r;
+
+BDRVRBDState *s = bs->opaque;
+
+acb = qemu_aio_get(_aiocb_info, bs, cb, opaque);
+acb->cmd = cmd;
+acb->qiov = qiov;
+assert(!qiov || qiov->size == size);
+
+acb->ret = 0;
+acb->error = 0;
+acb->s = s;
+
+rcb = g_new(RADOSCB, 1);
+rcb->acb = acb;
+rcb->s = acb->s;
+rcb->size = size;
+r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
+if (r < 0) {
+goto failed;
+}
+
+switch (cmd) {
+case RBD_AIO_WRITE:
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+break;
+case RBD_AIO_READ:
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+break;
+case RBD_AIO_DISCARD:
+r = rbd_aio_discard_wrapper(s->image, off, size, c);
+break;
+case RBD_AIO_FLUSH:
+r = rbd_aio_flush_wrapper(s->image, c);
+break;
+default:
+r = -EINVAL;
+}
+
+if (r < 0) {
+goto failed_completion;
+}
+
+return >common;
+
+failed_completion:
+rbd_aio_release(c);
+failed:
+g_free(rcb);
+qemu_aio_unref(acb);
+return NULL;
+
+}
+
+#else
 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
  int64_t off,
  QEMUIOVector *qiov,
@@ -723,6 +846,7 @@ failed:
 qemu_aio_unref(acb);
 return NULL;
 }
+#endif
 
 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
   int64_t sector_num,
-- 
2.10.2




[Qemu-devel] [PATCH] Add support readv,writev for rbd

2016-11-04 Thread jazeltq
From: tianqing 

Rbd can do readv, and writev, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 124 
 1 file changed, 124 insertions(+)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..a405c02 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -53,6 +53,13 @@
 #undef LIBRBD_SUPPORTS_DISCARD
 #endif
 
+/* rbd_aio_readv, rbd_aio_writev added in 0.1.11 */
+#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 11)
+#define LIBRBD_SUPPORTS_IOV
+#else
+#undef LIBRBD_SUPPORTS_IOV
+#endif
+
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
 #define RBD_MAX_CONF_NAME_SIZE 128
@@ -73,7 +80,10 @@ typedef struct RBDAIOCB {
 BlockAIOCB common;
 int64_t ret;
 QEMUIOVector *qiov;
+#ifdef LIBRBD_SUPPORTS_IOV
+#else
 char *bounce;
+#endif
 RBDAIOCmd cmd;
 int error;
 struct BDRVRBDState *s;
@@ -83,7 +93,10 @@ typedef struct RADOSCB {
 RBDAIOCB *acb;
 struct BDRVRBDState *s;
 int64_t size;
+#ifdef LIBRBD_SUPPORTS_IOV
+#else
 char *buf;
+#endif
 int64_t ret;
 } RADOSCB;
 
@@ -406,6 +419,48 @@ shutdown:
 return ret;
 }
 
+
+#ifdef LIBRBD_SUPPORTS_IOV
+/*
+ * This aio completion is being called from rbd_finish_bh() and runs in qemu
+ * BH context.
+ */
+static void qemu_rbd_complete_aio(RADOSCB *rcb)
+{
+RBDAIOCB *acb = rcb->acb;
+int64_t r;
+
+r = rcb->ret;
+
+if (acb->cmd != RBD_AIO_READ) {
+if (r < 0) {
+acb->ret = r;
+acb->error = 1;
+} else if (!acb->error) {
+acb->ret = rcb->size;
+}
+} else {
+if (r < 0) {
+iov_memset(acb->qiov->iov, acb->qiov->niov, 0, 0, acb->qiov->size);
+acb->ret = r;
+acb->error = 1;
+} else if (r < rcb->size) {
+iov_memset(acb->qiov->iov, acb->qiov->niov,
+   rcb->size - r, 0, acb->qiov->size);
+if (!acb->error) {
+acb->ret = rcb->size;
+}
+} else if (!acb->error) {
+acb->ret = r;
+}
+}
+
+g_free(rcb);
+acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+
+qemu_aio_unref(acb);
+}
+#else
 /*
  * This aio completion is being called from rbd_finish_bh() and runs in qemu
  * BH context.
@@ -449,6 +504,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 qemu_aio_unref(acb);
 }
+#endif
 
 /* TODO Convert to fine grained options */
 static QemuOptsList runtime_opts = {
@@ -644,6 +700,73 @@ static int rbd_aio_flush_wrapper(rbd_image_t image,
 #endif
 }
 
+#ifdef LIBRBD_SUPPORTS_IOV
+static BlockAIOCB *rbd_start_aio_vec(BlockDriverState *bs,
+ int64_t off,
+ QEMUIOVector *qiov,
+ int64_t size,
+ BlockCompletionFunc *cb,
+ void *opaque,
+ RBDAIOCmd cmd)
+{
+RBDAIOCB *acb;
+RADOSCB *rcb = NULL;
+rbd_completion_t c;
+int r;
+
+BDRVRBDState *s = bs->opaque;
+
+acb = qemu_aio_get(_aiocb_info, bs, cb, opaque);
+acb->cmd = cmd;
+acb->qiov = qiov;
+assert(!qiov || qiov->size == size);
+
+acb->ret = 0;
+acb->error = 0;
+acb->s = s;
+
+rcb = g_new(RADOSCB, 1);
+rcb->acb = acb;
+rcb->s = acb->s;
+rcb->size = size;
+r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
+if (r < 0) {
+goto failed;
+}
+
+switch (cmd) {
+case RBD_AIO_WRITE:
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+break;
+case RBD_AIO_READ:
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+break;
+case RBD_AIO_DISCARD:
+r = rbd_aio_discard_wrapper(s->image, off, size, c);
+break;
+case RBD_AIO_FLUSH:
+r = rbd_aio_flush_wrapper(s->image, c);
+break;
+default:
+r = -EINVAL;
+}
+
+if (r < 0) {
+goto failed_completion;
+}
+
+return >common;
+
+failed_completion:
+rbd_aio_release(c);
+failed:
+g_free(rcb);
+qemu_aio_unref(acb);
+return NULL;
+
+}
+
+#else
 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
  int64_t off,
  QEMUIOVector *qiov,
@@ -723,6 +846,7 @@ failed:
 qemu_aio_unref(acb);
 return NULL;
 }
+#endif
 
 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
   int64_t sector_num,
-- 
1.8.3.1




[Qemu-devel] [PATCH] Add support readv,writev for rbd

2016-11-04 Thread jazeltq
From: tianqing 

Rbd can do readv, and writev, so wo do not need to transform
iov to buf or vice versa any more.

Signed-off-by: tianqing 
---
 block/rbd.c | 124 
 1 file changed, 124 insertions(+)

diff --git a/block/rbd.c b/block/rbd.c
index a57b3e3..a405c02 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -53,6 +53,13 @@
 #undef LIBRBD_SUPPORTS_DISCARD
 #endif
 
+/* rbd_aio_readv, rbd_aio_writev added in 0.1.11 */
+#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 11)
+#define LIBRBD_SUPPORTS_IOV
+#else
+#undef LIBRBD_SUPPORTS_IOV
+#endif
+
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
 
 #define RBD_MAX_CONF_NAME_SIZE 128
@@ -73,7 +80,10 @@ typedef struct RBDAIOCB {
 BlockAIOCB common;
 int64_t ret;
 QEMUIOVector *qiov;
+#ifdef LIBRBD_SUPPORTS_IOV
+#else
 char *bounce;
+#endif
 RBDAIOCmd cmd;
 int error;
 struct BDRVRBDState *s;
@@ -83,7 +93,10 @@ typedef struct RADOSCB {
 RBDAIOCB *acb;
 struct BDRVRBDState *s;
 int64_t size;
+#ifdef LIBRBD_SUPPORTS_IOV
+#else
 char *buf;
+#endif
 int64_t ret;
 } RADOSCB;
 
@@ -406,6 +419,48 @@ shutdown:
 return ret;
 }
 
+
+#ifdef LIBRBD_SUPPORTS_IOV
+/*
+ * This aio completion is being called from rbd_finish_bh() and runs in qemu
+ * BH context.
+ */
+static void qemu_rbd_complete_aio(RADOSCB *rcb)
+{
+RBDAIOCB *acb = rcb->acb;
+int64_t r;
+
+r = rcb->ret;
+
+if (acb->cmd != RBD_AIO_READ) {
+if (r < 0) {
+acb->ret = r;
+acb->error = 1;
+} else if (!acb->error) {
+acb->ret = rcb->size;
+}
+} else {
+if (r < 0) {
+iov_memset(acb->qiov->iov, acb->qiov->niov, 0, 0, acb->qiov->size);
+acb->ret = r;
+acb->error = 1;
+} else if (r < rcb->size) {
+iov_memset(acb->qiov->iov, acb->qiov->niov,
+   rcb->size - r, 0, acb->qiov->size);
+if (!acb->error) {
+acb->ret = rcb->size;
+}
+} else if (!acb->error) {
+acb->ret = r;
+}
+}
+
+g_free(rcb);
+acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
+
+qemu_aio_unref(acb);
+}
+#else
 /*
  * This aio completion is being called from rbd_finish_bh() and runs in qemu
  * BH context.
@@ -449,6 +504,7 @@ static void qemu_rbd_complete_aio(RADOSCB *rcb)
 
 qemu_aio_unref(acb);
 }
+#endif
 
 /* TODO Convert to fine grained options */
 static QemuOptsList runtime_opts = {
@@ -644,6 +700,73 @@ static int rbd_aio_flush_wrapper(rbd_image_t image,
 #endif
 }
 
+#ifdef LIBRBD_SUPPORTS_IOV
+static BlockAIOCB *rbd_start_aio_vec(BlockDriverState *bs,
+ int64_t off,
+ QEMUIOVector *qiov,
+ int64_t size,
+ BlockCompletionFunc *cb,
+ void *opaque,
+ RBDAIOCmd cmd)
+{
+RBDAIOCB *acb;
+RADOSCB *rcb = NULL;
+rbd_completion_t c;
+int r;
+
+BDRVRBDState *s = bs->opaque;
+
+acb = qemu_aio_get(_aiocb_info, bs, cb, opaque);
+acb->cmd = cmd;
+acb->qiov = qiov;
+assert(!qiov || qiov->size == size);
+
+acb->ret = 0;
+acb->error = 0;
+acb->s = s;
+
+rcb = g_new(RADOSCB, 1);
+rcb->acb = acb;
+rcb->s = acb->s;
+rcb->size = size;
+r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, );
+if (r < 0) {
+goto failed;
+}
+
+switch (cmd) {
+case RBD_AIO_WRITE:
+r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
+break;
+case RBD_AIO_READ:
+r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+break;
+case RBD_AIO_DISCARD:
+r = rbd_aio_discard_wrapper(s->image, off, size, c);
+break;
+case RBD_AIO_FLUSH:
+r = rbd_aio_flush_wrapper(s->image, c);
+break;
+default:
+r = -EINVAL;
+}
+
+if (r < 0) {
+goto failed_completion;
+}
+
+return >common;
+
+failed_completion:
+rbd_aio_release(c);
+failed:
+g_free(rcb);
+qemu_aio_unref(acb);
+return NULL;
+
+}
+
+#else
 static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
  int64_t off,
  QEMUIOVector *qiov,
@@ -723,6 +846,7 @@ failed:
 qemu_aio_unref(acb);
 return NULL;
 }
+#endif
 
 static BlockAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
   int64_t sector_num,
-- 
1.8.3.1