Run block_copy iterations in parallel in aio tasks. Changes: - BlockCopyTask becomes aio task structure. Add zeroes field to pass it to block_copy_do_copy - add call state - it's a state of one call of block_copy(), shared between parallel tasks. For now used only to keep information about first error: is it read or not. - convert block_copy_dirty_clusters to aio-task loop.
Signed-off-by: Vladimir Sementsov-Ogievskiy <vsement...@virtuozzo.com> --- block/block-copy.c | 104 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 13 deletions(-) diff --git a/block/block-copy.c b/block/block-copy.c index 910947cb43..9994598eb7 100644 --- a/block/block-copy.c +++ b/block/block-copy.c @@ -19,15 +19,27 @@ #include "block/block-copy.h" #include "sysemu/block-backend.h" #include "qemu/units.h" +#include "qemu/coroutine.h" +#include "block/aio_task.h" #define BLOCK_COPY_MAX_COPY_RANGE (16 * MiB) #define BLOCK_COPY_MAX_BUFFER (1 * MiB) #define BLOCK_COPY_MAX_MEM (128 * MiB) +#define BLOCK_COPY_MAX_WORKERS 64 + +typedef struct BlockCopyCallState { + bool failed; + bool error_is_read; +} BlockCopyCallState; typedef struct BlockCopyTask { + AioTask task; + BlockCopyState *s; + BlockCopyCallState *call_state; int64_t offset; int64_t bytes; + bool zeroes; QLIST_ENTRY(BlockCopyTask) list; CoQueue wait_queue; /* coroutines blocked on this task */ } BlockCopyTask; @@ -225,6 +237,30 @@ void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm) s->progress = pm; } +/* Takes ownership on @task */ +static coroutine_fn int block_copy_task_run(AioTaskPool *pool, + BlockCopyTask *task) +{ + if (!pool) { + int ret = task->task.func(&task->task); + + g_free(task); + return ret; + } + + aio_task_pool_wait_slot(pool); + if (aio_task_pool_status(pool) < 0) { + co_put_to_shres(task->s->mem, task->bytes); + block_copy_task_end(task, -EAGAIN); + g_free(task); + return aio_task_pool_status(pool); + } + + aio_task_pool_start_task(pool, &task->task); + + return 0; +} + /* * block_copy_do_copy * @@ -328,8 +364,32 @@ out: return ret; } +static coroutine_fn int block_copy_task_entry(AioTask *task) +{ + BlockCopyTask *t = container_of(task, BlockCopyTask, task); + bool error_is_read; + int ret; + + ret = block_copy_do_copy(t->s, t->offset, t->bytes, t->zeroes, + &error_is_read); + if (ret < 0 && !t->call_state->failed) { + t->call_state->failed = true; + t->call_state->error_is_read = error_is_read; + } else { + progress_work_done(t->s->progress, t->bytes); + t->s->progress_bytes_callback(t->bytes, t->s->progress_opaque); + } + co_put_to_shres(t->s->mem, t->bytes); + block_copy_task_end(t, ret); + + return ret; +} + +/* Called only on full-dirty region */ static BlockCopyTask *block_copy_task_create(BlockCopyState *s, - int64_t offset, int64_t bytes) + BlockCopyCallState *call_state, + int64_t offset, + int64_t bytes) { int64_t next_zero; BlockCopyTask *task = g_new(BlockCopyTask, 1); @@ -351,7 +411,9 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s, s->in_flight_bytes += bytes; *task = (BlockCopyTask) { + .task.func = block_copy_task_entry, .s = s, + .call_state = call_state, .offset = offset, .bytes = bytes, }; @@ -478,6 +540,8 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, { int ret = 0; bool found_dirty = false; + AioTaskPool *aio = NULL; + BlockCopyCallState call_state = {false, false}; /* * block_copy() user is responsible for keeping source and target in same @@ -489,8 +553,8 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, assert(QEMU_IS_ALIGNED(offset, s->cluster_size)); assert(QEMU_IS_ALIGNED(bytes, s->cluster_size)); - while (bytes) { - g_autofree BlockCopyTask *task = NULL; + while (bytes && aio_task_pool_status(aio) == 0) { + BlockCopyTask *task; int64_t status_bytes; if (!bdrv_dirty_bitmap_get(s->copy_bitmap, offset)) { @@ -502,7 +566,7 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, found_dirty = true; - task = block_copy_task_create(s, offset, bytes); + task = block_copy_task_create(s, &call_state, offset, bytes); ret = block_copy_block_status(s, offset, task->bytes, &status_bytes); assert(ret >= 0); /* never fail */ @@ -511,6 +575,7 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, } if (s->skip_unallocated && !(ret & BDRV_BLOCK_ALLOCATED)) { block_copy_task_end(task, 0); + g_free(task); progress_set_remaining(s->progress, bdrv_get_dirty_count(s->copy_bitmap) + s->in_flight_bytes); @@ -519,25 +584,38 @@ static int coroutine_fn block_copy_dirty_clusters(BlockCopyState *s, bytes -= status_bytes; continue; } + task->zeroes = ret & BDRV_BLOCK_ZERO; trace_block_copy_process(s, offset); co_get_from_shres(s->mem, task->bytes); - ret = block_copy_do_copy(s, offset, task->bytes, ret & BDRV_BLOCK_ZERO, - error_is_read); - co_put_to_shres(s->mem, task->bytes); - block_copy_task_end(task, ret); - if (ret < 0) { - return ret; + + if (!aio && task->bytes != bytes) { + aio = aio_task_pool_new(BLOCK_COPY_MAX_WORKERS); } - progress_work_done(s->progress, task->bytes); - s->progress_bytes_callback(task->bytes, s->progress_opaque); offset += task->bytes; bytes -= task->bytes; + + ret = block_copy_task_run(aio, task); + if (ret < 0) { + goto out; + } + } + +out: + if (aio) { + aio_task_pool_wait_all(aio); + if (ret == 0) { + ret = aio_task_pool_status(aio); + } + g_free(aio); + } + if (error_is_read && ret < 0) { + *error_is_read = call_state.error_is_read; } - return found_dirty; + return ret < 0 ? ret : found_dirty; } /* -- 2.21.0