This brings async request handling and block-status driven chunk sizes
to backup out of the box, which improves backup performance.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com>
---
 include/block/block-copy.h |   9 +--
 block/backup.c             | 145 +++++++++++++++++++------------------
 block/block-copy.c         |  21 ++----
 3 files changed, 83 insertions(+), 92 deletions(-)

diff --git a/include/block/block-copy.h b/include/block/block-copy.h
index 370a194d3c..a3e11d3fa2 100644
--- a/include/block/block-copy.h
+++ b/include/block/block-copy.h
@@ -18,7 +18,6 @@
 #include "block/block.h"
 #include "qemu/co-shared-resource.h"
 
-typedef void (*ProgressBytesCallbackFunc)(int64_t bytes, void *opaque);
 typedef void (*BlockCopyAsyncCallbackFunc)(int ret, bool error_is_read,
                                            void *opaque);
 typedef struct BlockCopyState BlockCopyState;
@@ -29,11 +28,6 @@ BlockCopyState *block_copy_state_new(BdrvChild *source, 
BdrvChild *target,
                                      BdrvRequestFlags write_flags,
                                      Error **errp);
 
-void block_copy_set_progress_callback(
-        BlockCopyState *s,
-        ProgressBytesCallbackFunc progress_bytes_callback,
-        void *progress_opaque);
-
 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm);
 
 void block_copy_state_free(BlockCopyState *s);
@@ -57,7 +51,8 @@ BlockCopyCallState *block_copy_async(BlockCopyState *s,
                                      int64_t offset, int64_t bytes,
                                      bool ratelimit, int max_workers,
                                      int64_t max_chunk,
-                                     BlockCopyAsyncCallbackFunc cb);
+                                     BlockCopyAsyncCallbackFunc cb,
+                                     void *cb_opaque);
 
 /*
  * Set speed limit for block-copy instance. All block-copy operations related 
to
diff --git a/block/backup.c b/block/backup.c
index ec2676abc2..59c00f5293 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -44,42 +44,19 @@ typedef struct BackupBlockJob {
     BlockdevOnError on_source_error;
     BlockdevOnError on_target_error;
     uint64_t len;
-    uint64_t bytes_read;
     int64_t cluster_size;
     int max_workers;
     int64_t max_chunk;
 
     BlockCopyState *bcs;
+
+    BlockCopyCallState *bcs_call;
+    int ret;
+    bool error_is_read;
 } BackupBlockJob;
 
 static const BlockJobDriver backup_job_driver;
 
-static void backup_progress_bytes_callback(int64_t bytes, void *opaque)
-{
-    BackupBlockJob *s = opaque;
-
-    s->bytes_read += bytes;
-}
-
-static int coroutine_fn backup_do_cow(BackupBlockJob *job,
-                                      int64_t offset, uint64_t bytes,
-                                      bool *error_is_read)
-{
-    int ret = 0;
-    int64_t start, end; /* bytes */
-
-    start = QEMU_ALIGN_DOWN(offset, job->cluster_size);
-    end = QEMU_ALIGN_UP(bytes + offset, job->cluster_size);
-
-    trace_backup_do_cow_enter(job, start, offset, bytes);
-
-    ret = block_copy(job->bcs, start, end - start, error_is_read);
-
-    trace_backup_do_cow_return(job, offset, bytes, ret);
-
-    return ret;
-}
-
 static void backup_cleanup_sync_bitmap(BackupBlockJob *job, int ret)
 {
     BdrvDirtyBitmap *bm;
@@ -159,54 +136,58 @@ static BlockErrorAction 
backup_error_action(BackupBlockJob *job,
     }
 }
 
-static bool coroutine_fn yield_and_check(BackupBlockJob *job)
+static void coroutine_fn backup_block_copy_callback(int ret, bool 
error_is_read,
+                                                    void *opaque)
 {
-    uint64_t delay_ns;
-
-    if (job_is_cancelled(&job->common.job)) {
-        return true;
-    }
-
-    /*
-     * We need to yield even for delay_ns = 0 so that bdrv_drain_all() can
-     * return. Without a yield, the VM would not reboot.
-     */
-    delay_ns = block_job_ratelimit_get_delay(&job->common, job->bytes_read);
-    job->bytes_read = 0;
-    job_sleep_ns(&job->common.job, delay_ns);
-
-    if (job_is_cancelled(&job->common.job)) {
-        return true;
-    }
+    BackupBlockJob *s = opaque;
 
-    return false;
+    s->bcs_call = NULL;
+    s->ret = ret;
+    s->error_is_read = error_is_read;
+    job_enter(&s->common.job);
 }
 
 static int coroutine_fn backup_loop(BackupBlockJob *job)
 {
-    bool error_is_read;
-    int64_t offset;
-    BdrvDirtyBitmapIter *bdbi;
-    int ret = 0;
+    while (true) { /* retry loop */
+        assert(!job->bcs_call);
+        job->bcs_call = block_copy_async(job->bcs, 0,
+                                         QEMU_ALIGN_UP(job->len,
+                                                       job->cluster_size),
+                                         true, job->max_workers, 
job->max_chunk,
+                                         backup_block_copy_callback, job);
 
-    bdbi = bdrv_dirty_iter_new(block_copy_dirty_bitmap(job->bcs));
-    while ((offset = bdrv_dirty_iter_next(bdbi)) != -1) {
-        do {
-            if (yield_and_check(job)) {
-                goto out;
+        while (job->bcs_call && !job->common.job.cancelled) {
+            /* wait and handle pauses */
+
+            job_pause_point(&job->common.job);
+
+            if (job->bcs_call && !job->common.job.cancelled) {
+                job_yield(&job->common.job);
             }
-            ret = backup_do_cow(job, offset, job->cluster_size, 
&error_is_read);
-            if (ret < 0 && backup_error_action(job, error_is_read, -ret) ==
-                           BLOCK_ERROR_ACTION_REPORT)
-            {
-                goto out;
+        }
+
+        if (!job->bcs_call && job->ret == 0) {
+            /* Success */
+            return 0;
+        }
+
+        if (job->common.job.cancelled) {
+            if (job->bcs_call) {
+                block_copy_cancel(job->bcs_call);
             }
-        } while (ret < 0);
+            return 0;
+        }
+
+        if (!job->bcs_call && job->ret < 0 &&
+            (backup_error_action(job, job->error_is_read, -job->ret) ==
+             BLOCK_ERROR_ACTION_REPORT))
+        {
+            return job->ret;
+        }
     }
 
- out:
-    bdrv_dirty_iter_free(bdbi);
-    return ret;
+    g_assert_not_reached();
 }
 
 static void backup_init_bcs_bitmap(BackupBlockJob *job)
@@ -246,9 +227,14 @@ static int coroutine_fn backup_run(Job *job, Error **errp)
         int64_t count;
 
         for (offset = 0; offset < s->len; ) {
-            if (yield_and_check(s)) {
-                ret = -ECANCELED;
-                goto out;
+            if (job_is_cancelled(job)) {
+                return -ECANCELED;
+            }
+
+            job_pause_point(job);
+
+            if (job_is_cancelled(job)) {
+                return -ECANCELED;
             }
 
             ret = block_copy_reset_unallocated(s->bcs, offset, &count);
@@ -281,6 +267,25 @@ static int coroutine_fn backup_run(Job *job, Error **errp)
     return ret;
 }
 
+static void coroutine_fn backup_pause(Job *job)
+{
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
+
+    if (s->bcs_call) {
+        block_copy_cancel(s->bcs_call);
+    }
+}
+
+static void coroutine_fn backup_set_speed(BlockJob *job, int64_t speed)
+{
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
+
+    if (s->bcs) {
+        /* In block_job_create we yet don't have bcs */
+        block_copy_set_speed(s->bcs, s->bcs_call, speed);
+    }
+}
+
 static const BlockJobDriver backup_job_driver = {
     .job_driver = {
         .instance_size          = sizeof(BackupBlockJob),
@@ -291,7 +296,9 @@ static const BlockJobDriver backup_job_driver = {
         .commit                 = backup_commit,
         .abort                  = backup_abort,
         .clean                  = backup_clean,
-    }
+        .pause                  = backup_pause,
+    },
+    .set_speed = backup_set_speed,
 };
 
 static int64_t backup_calculate_cluster_size(BlockDriverState *target,
@@ -487,8 +494,8 @@ BlockJob *backup_job_create(const char *job_id, 
BlockDriverState *bs,
     job->max_workers = max_workers;
     job->max_chunk = max_chunk;
 
-    block_copy_set_progress_callback(bcs, backup_progress_bytes_callback, job);
     block_copy_set_progress_meter(bcs, &job->common.job.progress);
+    block_copy_set_speed(bcs, NULL, speed);
 
     /* Required permissions are already taken by backup-top target */
     block_job_add_bdrv(&job->common, "target", target, 0, BLK_PERM_ALL,
diff --git a/block/block-copy.c b/block/block-copy.c
index b551feb6c2..6a9d891b63 100644
--- a/block/block-copy.c
+++ b/block/block-copy.c
@@ -39,6 +39,7 @@ typedef struct BlockCopyCallState {
     int64_t max_chunk;
     bool ratelimit;
     BlockCopyAsyncCallbackFunc cb;
+    void *cb_opaque;
 
     /* State */
     bool failed;
@@ -103,9 +104,6 @@ typedef struct BlockCopyState {
     bool skip_unallocated;
 
     ProgressMeter *progress;
-    /* progress_bytes_callback: called when some copying progress is done. */
-    ProgressBytesCallbackFunc progress_bytes_callback;
-    void *progress_opaque;
 
     SharedResource *mem;
 
@@ -287,15 +285,6 @@ BlockCopyState *block_copy_state_new(BdrvChild *source, 
BdrvChild *target,
     return s;
 }
 
-void block_copy_set_progress_callback(
-        BlockCopyState *s,
-        ProgressBytesCallbackFunc progress_bytes_callback,
-        void *progress_opaque)
-{
-    s->progress_bytes_callback = progress_bytes_callback;
-    s->progress_opaque = progress_opaque;
-}
-
 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
 {
     s->progress = pm;
@@ -443,7 +432,6 @@ static coroutine_fn int block_copy_task_entry(AioTask *task)
         t->call_state->error_is_read = error_is_read;
     } else {
         progress_work_done(t->s->progress, t->bytes);
-        t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque);
     }
     co_put_to_shres(t->s->mem, t->bytes);
     block_copy_task_end(t, ret);
@@ -712,8 +700,7 @@ static int coroutine_fn 
block_copy_common(BlockCopyCallState *call_state)
     } while (ret > 0 && !call_state->cancelled);
 
     if (call_state->cb) {
-        call_state->cb(ret, call_state->error_is_read,
-                       call_state->s->progress_opaque);
+        call_state->cb(ret, call_state->error_is_read, call_state->cb_opaque);
     }
 
     if (call_state->canceller) {
@@ -754,7 +741,8 @@ BlockCopyCallState *block_copy_async(BlockCopyState *s,
                                      int64_t offset, int64_t bytes,
                                      bool ratelimit, int max_workers,
                                      int64_t max_chunk,
-                                     BlockCopyAsyncCallbackFunc cb)
+                                     BlockCopyAsyncCallbackFunc cb,
+                                     void *cb_opaque)
 {
     BlockCopyCallState *call_state = g_new(BlockCopyCallState, 1);
     Coroutine *co = qemu_coroutine_create(block_copy_async_co_entry,
@@ -766,6 +754,7 @@ BlockCopyCallState *block_copy_async(BlockCopyState *s,
         .bytes = bytes,
         .ratelimit = ratelimit,
         .cb = cb,
+        .cb_opaque = cb_opaque,
         .max_workers = max_workers ?: BLOCK_COPY_MAX_WORKERS,
         .max_chunk = max_chunk,
     };
-- 
2.21.0


Reply via email to