The patch adds ability to qemu-file to write the data
asynchronously to improve the performance on writing.
Before, only synchronous writing was supported.

Enabling of the asyncronous mode is managed by new
"enabled_buffered" callback.

Signed-off-by: Denis Plotnikov <dplotni...@virtuozzo.com>
---
 include/qemu/typedefs.h |   1 +
 migration/qemu-file.c   | 351 +++++++++++++++++++++++++++++++++++++++++++++---
 migration/qemu-file.h   |   9 ++
 3 files changed, 339 insertions(+), 22 deletions(-)

diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 88dce54..9b388c8 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH;
 typedef struct QemuConsole QemuConsole;
 typedef struct QEMUFile QEMUFile;
 typedef struct QEMUFileBuffer QEMUFileBuffer;
+typedef struct QEMUFileAioTask QEMUFileAioTask;
 typedef struct QemuLockable QemuLockable;
 typedef struct QemuMutex QemuMutex;
 typedef struct QemuOpt QemuOpt;
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 285c6ef..f42f949 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -29,19 +29,25 @@
 #include "qemu-file.h"
 #include "trace.h"
 #include "qapi/error.h"
+#include "block/aio_task.h"
 
-#define IO_BUF_SIZE 32768
+#define IO_BUF_SIZE (1024 * 1024)
 #define MAX_IOV_SIZE MIN(IOV_MAX, 64)
+#define IO_BUF_NUM 2
+#define IO_BUF_ALIGNMENT 512
 
-QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512));
+QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT));
+QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX);
+QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0);
 
 struct QEMUFileBuffer {
     int buf_index;
-    int buf_size; /* 0 when writing */
+    int buf_size; /* 0 when non-buffered writing */
     uint8_t *buf;
     unsigned long *may_free;
     struct iovec *iov;
     unsigned int iovcnt;
+    QLIST_ENTRY(QEMUFileBuffer) link;
 };
 
 struct QEMUFile {
@@ -60,6 +66,22 @@ struct QEMUFile {
     bool shutdown;
     /* currently used buffer */
     QEMUFileBuffer *current_buf;
+    /*
+     * with buffered_mode enabled all the data copied to 512 byte
+     * aligned buffer, including iov data. Then the buffer is passed
+     * to writev_buffer callback.
+     */
+    bool buffered_mode;
+    /* for async buffer writing */
+    AioTaskPool *pool;
+    /* the list of free buffers, currently used on is NOT there */
+    QLIST_HEAD(, QEMUFileBuffer) free_buffers;
+};
+
+struct QEMUFileAioTask {
+    AioTask task;
+    QEMUFile *f;
+    QEMUFileBuffer *fb;
 };
 
 /*
@@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps 
*ops)
     f->opaque = opaque;
     f->ops = ops;
 
-    f->current_buf = g_new0(QEMUFileBuffer, 1);
-    f->current_buf->buf = g_malloc(IO_BUF_SIZE);
-    f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
-    f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
+    if (f->ops->enable_buffered) {
+        f->buffered_mode = f->ops->enable_buffered(f->opaque);
+    }
+
+    if (f->buffered_mode && qemu_file_is_writable(f)) {
+        int i;
+        /*
+         * in buffered_mode we don't use internal io vectors
+         * and may_free bitmap, because we copy the data to be
+         * written right away to the buffer
+         */
+        f->pool = aio_task_pool_new(IO_BUF_NUM);
+
+        /* allocate io buffers */
+        for (i = 0; i < IO_BUF_NUM; i++) {
+            QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1);
+
+            fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE);
+            fb->buf_size = IO_BUF_SIZE;
+
+            /*
+             * put the first buffer to the current buf and the rest
+             * to the list of free buffers
+             */
+            if (i == 0) {
+                f->current_buf = fb;
+            } else {
+                QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
+            }
+        }
+    } else {
+        f->current_buf = g_new0(QEMUFileBuffer, 1);
+        f->current_buf->buf = g_malloc(IO_BUF_SIZE);
+        f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE);
+        f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE);
+    }
 
     return f;
 }
@@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f)
     unsigned long idx;
     QEMUFileBuffer *fb = f->current_buf;
 
+    assert(!f->buffered_mode);
+
     /* Find and release all the contiguous memory ranges marked as may_free. */
     idx = find_next_bit(fb->may_free, fb->iovcnt, 0);
     if (idx >= fb->iovcnt) {
@@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f)
     bitmap_zero(fb->may_free, MAX_IOV_SIZE);
 }
 
+static void advance_buf_ptr(QEMUFile *f, size_t size)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    /* must not advance to 0 */
+    assert(size);
+    /* must not overflow buf_index (int) */
+    assert(fb->buf_index + size <= INT_MAX);
+    /* must not exceed buf_size */
+    assert(fb->buf_index + size <= fb->buf_size);
+
+    fb->buf_index += size;
+}
+
+static size_t get_buf_free_size(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    /* buf_index can't be greated than buf_size */
+    assert(fb->buf_size >= fb->buf_index);
+    return fb->buf_size - fb->buf_index;
+}
+
+static size_t get_buf_used_size(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    return fb->buf_index;
+}
+
+static uint8_t *get_buf_ptr(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    /* protects from out of bound reading */
+    assert(fb->buf_index <= IO_BUF_SIZE);
+    return fb->buf + fb->buf_index;
+}
+
+static bool buf_is_full(QEMUFile *f)
+{
+    return get_buf_free_size(f) == 0;
+}
+
+static void reset_buf(QEMUFile *f)
+{
+    QEMUFileBuffer *fb = f->current_buf;
+    fb->buf_index = 0;
+}
+
+static int write_task_fn(AioTask *task)
+{
+    int ret;
+    Error *local_error = NULL;
+    QEMUFileAioTask *t = (QEMUFileAioTask *) task;
+    QEMUFile *f = t->f;
+    QEMUFileBuffer *fb = t->fb;
+    uint64_t pos = f->pos;
+    struct iovec v = (struct iovec) {
+        .iov_base = fb->buf,
+        .iov_len = fb->buf_index,
+    };
+
+    assert(f->buffered_mode);
+
+    /*
+     * Increment file position.
+     * This needs to be here before calling writev_buffer, because
+     * writev_buffer is asynchronous and there could be more than one
+     * writev_buffer started simultaniously. Each writev_buffer should
+     * use its own file pos to write to. writev_buffer may write less
+     * than buf_index bytes but we treat this situation as an error.
+     * If error appeared, further file using is meaningless.
+     * We expect that, the most of the time the full buffer is written,
+     * (when buf_size == buf_index). The only case when the non-full
+     * buffer is written (buf_size != buf_index) is file close,
+     * when we need to flush the rest of the buffer content.
+     */
+    f->pos += fb->buf_index;
+
+    ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error);
+
+    /* return the just written buffer to the free list */
+    QLIST_INSERT_HEAD(&f->free_buffers, fb, link);
+
+    /* check that we have written everything */
+    if (ret != fb->buf_index) {
+        qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error);
+    }
+
+    /*
+     * always return 0 - don't use task error handling, relay on
+     * qemu file error handling
+     */
+    return 0;
+}
+
+static void qemu_file_switch_current_buf(QEMUFile *f)
+{
+    /*
+     * if the list is empty, wait until some task returns a buffer
+     * to the list of free buffers.
+     */
+    if (QLIST_EMPTY(&f->free_buffers)) {
+        aio_task_pool_wait_slot(f->pool);
+    }
+
+    /*
+     * sanity check that the list isn't empty
+     * if the free list was empty, we waited for a task complition,
+     * and the pompleted task must return a buffer to a list of free buffers
+     */
+    assert(!QLIST_EMPTY(&f->free_buffers));
+
+    /* set the current buffer for using from the free list */
+    f->current_buf = QLIST_FIRST(&f->free_buffers);
+    reset_buf(f);
+
+    QLIST_REMOVE(f->current_buf, link);
+}
+
+/**
+ *  Asynchronously flushes QEMUFile buffer
+ *
+ * This will flush all pending data. If data was only partially flushed, it
+ * will set an error state. The function may return before the data actually
+ * written.
+ */
+static void flush_buffer(QEMUFile *f)
+{
+    QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1);
+
+    *t = (QEMUFileAioTask) {
+        .task.func = &write_task_fn,
+        .f = f,
+        .fb = f->current_buf,
+    };
+
+    /* aio_task_pool should free t for us */
+    aio_task_pool_start_task(f->pool, (AioTask *) t);
+
+    /* if no errors this will switch the buffer */
+    qemu_file_switch_current_buf(f);
+}
+
 /**
  * Flushes QEMUFile buffer
  *
@@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f)
     if (f->shutdown) {
         return;
     }
+
+    if (f->buffered_mode) {
+        return;
+    }
+
     if (fb->iovcnt > 0) {
+        /* this is non-buffered mode */
         expect = iov_size(fb->iov, fb->iovcnt);
         ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos,
                                     &local_error);
@@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f)
 
 void qemu_update_position(QEMUFile *f, size_t size)
 {
+    assert(!f->buffered_mode);
     f->pos += size;
 }
 
@@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size)
 int qemu_fclose(QEMUFile *f)
 {
     int ret;
-    qemu_fflush(f);
+
+    if (qemu_file_is_writable(f) && f->buffered_mode) {
+        ret = qemu_file_get_error(f);
+        if (!ret) {
+            flush_buffer(f);
+        }
+        /* wait until all tasks are done */
+        aio_task_pool_wait_all(f->pool);
+    } else {
+        qemu_fflush(f);
+    }
+
     ret = qemu_file_get_error(f);
 
     if (f->ops->close) {
@@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f)
         ret = f->last_error;
     }
     error_free(f->last_error_obj);
-    g_free(f->current_buf->buf);
-    g_free(f->current_buf->iov);
-    g_free(f->current_buf->may_free);
-    g_free(f->current_buf);
+
+    if (f->buffered_mode) {
+        QEMUFileBuffer *fb, *next;
+        /*
+         * put the current back to the free buffers list
+         * to destroy all the buffers in one loop
+         */
+        QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link);
+
+        /* destroy all the buffers */
+        QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) {
+            QLIST_REMOVE(fb, link);
+            /* looks like qemu_vfree pairs with qemu_memalign */
+            qemu_vfree(fb->buf);
+            g_free(fb);
+        }
+        g_free(f->pool);
+    } else {
+        g_free(f->current_buf->buf);
+        g_free(f->current_buf->iov);
+        g_free(f->current_buf->may_free);
+        g_free(f->current_buf);
+    }
+
     g_free(f);
     trace_qemu_file_fclose();
     return ret;
 }
 
 /*
+ * Copy an external buffer to the intenal current buffer.
+ */
+static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size,
+                     bool may_free)
+{
+    size_t data_size = size;
+    const uint8_t *src_ptr = buf;
+
+    assert(f->buffered_mode);
+    assert(size <= INT_MAX);
+
+    while (data_size > 0) {
+        size_t chunk_size;
+
+        if (buf_is_full(f)) {
+            flush_buffer(f);
+            if (qemu_file_get_error(f)) {
+                return;
+            }
+        }
+
+        chunk_size = MIN(get_buf_free_size(f), data_size);
+
+        memcpy(get_buf_ptr(f), src_ptr, chunk_size);
+
+        advance_buf_ptr(f, chunk_size);
+
+        src_ptr += chunk_size;
+        data_size -= chunk_size;
+        f->bytes_xfer += chunk_size;
+    }
+
+    if (may_free) {
+        if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) {
+            error_report("migrate: madvise DONTNEED failed %p %zd: %s",
+                         buf, size, strerror(errno));
+        }
+    }
+}
+
+/*
  * Add buf to iovec. Do flush if iovec is full.
  *
  * Return values:
@@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, 
size_t size,
 static void add_buf_to_iovec(QEMUFile *f, size_t len)
 {
     QEMUFileBuffer *fb = f->current_buf;
+
+    assert(!f->buffered_mode);
+
     if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) {
         fb->buf_index += len;
         if (fb->buf_index == IO_BUF_SIZE) {
@@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t 
*buf, size_t size,
         return;
     }
 
-    f->bytes_xfer += size;
-    add_to_iovec(f, buf, size, may_free);
+    if (f->buffered_mode) {
+        copy_buf(f, buf, size, may_free);
+    } else {
+        f->bytes_xfer += size;
+        add_to_iovec(f, buf, size, may_free);
+    }
 }
 
 void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size)
@@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, 
size_t size)
         return;
     }
 
+    if (f->buffered_mode) {
+        copy_buf(f, buf, size, false);
+        return;
+    }
+
     while (size > 0) {
         l = IO_BUF_SIZE - fb->buf_index;
         if (l > size) {
@@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v)
         return;
     }
 
-    fb->buf[fb->buf_index] = v;
-    f->bytes_xfer++;
-    add_buf_to_iovec(f, 1);
+    if (f->buffered_mode) {
+        copy_buf(f, (const uint8_t *) &v, 1, false);
+    } else {
+        fb->buf[fb->buf_index] = v;
+        add_buf_to_iovec(f, 1);
+        f->bytes_xfer++;
+    }
 }
 
 void qemu_file_skip(QEMUFile *f, int size)
 {
     QEMUFileBuffer *fb = f->current_buf;
 
+    assert(!f->buffered_mode);
+
     if (fb->buf_index + size <= fb->buf_size) {
         fb->buf_index += size;
     }
@@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f)
 {
     int64_t ret = f->pos;
     int i;
-    QEMUFileBuffer *fb = f->current_buf;
 
-    for (i = 0; i < fb->iovcnt; i++) {
-        ret += fb->iov[i].iov_len;
+    if (f->buffered_mode) {
+        ret += get_buf_used_size(f);
+    } else {
+        QEMUFileBuffer *fb = f->current_buf;
+        for (i = 0; i < fb->iovcnt; i++) {
+            ret += fb->iov[i].iov_len;
+        }
     }
 
     return ret;
@@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f)
 
 int64_t qemu_ftell(QEMUFile *f)
 {
-    qemu_fflush(f);
-    return f->pos;
+    if (f->buffered_mode) {
+        return qemu_ftell_fast(f);
+    } else {
+        qemu_fflush(f);
+        return f->pos;
+    }
 }
 
 int qemu_file_rate_limit(QEMUFile *f)
@@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream 
*stream,
     QEMUFileBuffer *fb = f->current_buf;
     ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t);
 
+    assert(!f->buffered_mode);
+
     if (blen < compressBound(size)) {
         return -1;
     }
@@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src)
     int len = 0;
     QEMUFileBuffer *fb_src = f_src->current_buf;
 
+    assert(!f_des->buffered_mode);
+    assert(!f_src->buffered_mode);
+
     if (fb_src->buf_index > 0) {
         len = fb_src->buf_index;
         qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index);
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a9b6d6c..08655d2 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque);
 typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr,
                                    Error **errp);
 
+/*
+ * Enables or disables the buffered mode
+ * Existing blocking reads/writes must be woken
+ * Returns true if the buffered mode has to be enabled,
+ * false if it has to be disabled.
+ */
+typedef bool (QEMUFileEnableBufferedFunc)(void *opaque);
+
 typedef struct QEMUFileOps {
     QEMUFileGetBufferFunc *get_buffer;
     QEMUFileCloseFunc *close;
@@ -110,6 +118,7 @@ typedef struct QEMUFileOps {
     QEMUFileWritevBufferFunc *writev_buffer;
     QEMURetPathFunc *get_return_path;
     QEMUFileShutdownFunc *shut_down;
+    QEMUFileEnableBufferedFunc *enable_buffered;
 } QEMUFileOps;
 
 typedef struct QEMUFileHooks {
-- 
1.8.3.1


Reply via email to