Re: [Qemu-devel] [PATCH v2 0/3] Make thread pool implementation modular
2013/11/11 Stefan Hajnoczi stefa...@redhat.com: On Mon, Nov 11, 2013 at 11:00:45AM +0100, Matthias Brugger wrote: 2013/11/5 Stefan Hajnoczi stefa...@gmail.com: I'd also like to see the thread pool implementation you wish to add before we add a layer of indirection which has no users yet. Fair enough, I will evaluate if it will make more sense to implement a new AIO infrastructure instead to try reuse the thread-pool. Actually my implementation will differ in the way, that we will have several workerthreads with everyone of them having its own queue. The requests will be distributed between them depending on an identifier. The request function which the worker_thread call will be the same as using aio=threads, so I'm not quite sure which will be the way to go. Any opinions and hints, like the one you gave are highly appreciated. If I understand the slides you linked to correctly, the guest will pass an identifier with each request. The host has worker threads allowing each stream of requests to be serviced independently. The idea is to associate guest processes with unique identifiers. The guest I/O scheduler is supposed to submit requests in a way that meets certain policies (e.g. fairness between processes, deadlines, etc). Why is it necessary to push this task down into the host? I don't understand the advantage of this approach except that maybe it works around certain misconfigurations, I/O scheduler quirks, or plain old bugs - all of which should be investigated and fixed at the source instead of adding another layer of code to mask them. It is about I/O scheduling. CFQ the state of the art I/O scheduler merges adjacent requests from the same PID before dispatching them to the disk. If we can distinguish between the different threads of a virtual machine that read/write a file, the I/O scheduler in the host can merge requests in an effective way for sequential access. Qemu fails in this, because of its architecture. Apart that at the moment there is no way to distinguish the guest threads from each other (I'm working on some kernel patches), Qemu has one big queue from which several workerthreads grab requests and dispatch them to the disk. Even if you have one large read from just one thread in the guest, the I/O scheduler in the host will get the requests from different PIDs (= workerthreads) and won't be able to merge them. In former versions, there was some work done to merge requests in Qemu, but I don't think they were very useful, because you don't know how the layout of the image file looks like on the physical disk. Anyway I think this code parts have been removed. The only layer where you really know how the blocks of the virtual disk image are distributed over the disk is the block layer of the host. So you have to do the block request merging there. With the new architecture this would come for free, as you can map every thread from a guest to one workerthread of Qemu. Matthias Stefan -- motzblog.wordpress.com
Re: [Qemu-devel] [PATCH v2 0/3] Make thread pool implementation modular
2013/11/5 Stefan Hajnoczi stefa...@gmail.com: On Mon, Nov 04, 2013 at 11:28:41AM +0100, Matthias Brugger wrote: v2: - fix issues found by checkpatch.pl - change the descritpion of patch 3 This patch series makes the thread pool implementation modular. This allows each drive to use a special implementation. The patch series prepares qemu to be able to include thread pools different to the one actually implemented. It will allow to implement approaches like paravirtualized block requests [1]. [1] http://www.linux-kvm.org/wiki/images/5/53/2012-forum-Brugger-lightningtalk.pdf -drive aio=threads|native already distinguishes between different AIO implementations. IMO -drive aio= is the logical interface if you want to add a new AIO mechanism. Good point. I'd also like to see the thread pool implementation you wish to add before we add a layer of indirection which has no users yet. Fair enough, I will evaluate if it will make more sense to implement a new AIO infrastructure instead to try reuse the thread-pool. Actually my implementation will differ in the way, that we will have several workerthreads with everyone of them having its own queue. The requests will be distributed between them depending on an identifier. The request function which the worker_thread call will be the same as using aio=threads, so I'm not quite sure which will be the way to go. Any opinions and hints, like the one you gave are highly appreciated. -- motzblog.wordpress.com
[Qemu-devel] [PATCH v2 0/3] Make thread pool implementation modular
v2: - fix issues found by checkpatch.pl - change the descritpion of patch 3 This patch series makes the thread pool implementation modular. This allows each drive to use a special implementation. The patch series prepares qemu to be able to include thread pools different to the one actually implemented. It will allow to implement approaches like paravirtualized block requests [1]. [1] http://www.linux-kvm.org/wiki/images/5/53/2012-forum-Brugger-lightningtalk.pdf Matthias Brugger (3): Make thread pool implementation modular Block layer uses modular thread pool Add workerthreads configuration option async.c | 4 ++-- block/raw-posix.c | 15 +++ block/raw-win32.c | 9 +++-- blockdev.c | 13 + include/block/aio.h | 2 +- include/block/thread-pool.h | 11 +++ include/qemu-common.h | 2 ++ qemu-options.hx | 2 +- thread-pool.c | 33 + 9 files changed, 81 insertions(+), 10 deletions(-) -- 1.8.1.2
[Qemu-devel] [PATCH v2 3/3] Add workerthreads configuration option
This patch allows to choose at the command line level which thread pool implementation will be used by every block device. Signed-off-by: Matthias Brugger matthias@gmail.com --- blockdev.c | 13 + qemu-options.hx | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/blockdev.c b/blockdev.c index b260477..a16cc9a 100644 --- a/blockdev.c +++ b/blockdev.c @@ -387,6 +387,15 @@ static DriveInfo *blockdev_init(QDict *bs_opts, } } #endif +buf = qemu_opt_get(opts, workerthreads); +if (buf != NULL) { +if (!strcmp(buf, pool)) { +/* this is the default */ +} else { +error_report(invalid workerthreads option); +return NULL; +} +} if ((buf = qemu_opt_get(opts, format)) != NULL) { if (is_help_option(buf)) { @@ -2269,6 +2278,10 @@ QemuOptsList qemu_common_drive_opts = { .type = QEMU_OPT_STRING, .help = disk serial number, },{ +.name = workerthreads, +.type = QEMU_OPT_STRING, +.help = type of worker threads (pool), +},{ .name = rerror, .type = QEMU_OPT_STRING, .help = read error action, diff --git a/qemu-options.hx b/qemu-options.hx index 5dc8b75..6f22242 100644 --- a/qemu-options.hx +++ b/qemu-options.hx @@ -408,7 +408,7 @@ DEF(drive, HAS_ARG, QEMU_OPTION_drive, [,cyls=c,heads=h,secs=s[,trans=t]][,snapshot=on|off]\n [,cache=writethrough|writeback|none|directsync|unsafe][,format=f]\n [,serial=s][,addr=A][,id=name][,aio=threads|native]\n - [,readonly=on|off][,copy-on-read=on|off]\n + [,workerthreads=pool][,readonly=on|off][,copy-on-read=on|off]\n [[,bps=b]|[[,bps_rd=r][,bps_wr=w]]]\n [[,iops=i]|[[,iops_rd=r][,iops_wr=w]]]\n [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n -- 1.8.1.2
[Qemu-devel] [PATCH v2 1/3] Make thread pool implementation modular
This patch introduces function pointers for the thread pool, so that it's implementation can be set at run-time. Signed-off-by: Matthias Brugger matthias@gmail.com --- include/block/thread-pool.h | 11 +++ thread-pool.c | 33 + 2 files changed, 44 insertions(+) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 32afcdd..1f73712 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -38,4 +38,15 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, void *arg); void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); +ThreadPoolFuncArr *thread_pool_probe(void); +void thread_pool_delete(ThreadPoolFuncArr *tpf); + +struct ThreadPoolFuncArr { +BlockDriverAIOCB *(*thread_pool_submit_aio)(ThreadPool *pool, +ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb, +void *opaque); +ThreadPool *(*thread_pool_new)(AioContext *ctx); +}; + + #endif diff --git a/thread-pool.c b/thread-pool.c index 3735fd3..53294a9 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -26,6 +26,7 @@ #include qemu/main-loop.h static void do_spawn_thread(ThreadPool *pool); +static void thread_pool_aio_free(ThreadPool *pool); typedef struct ThreadPoolElement ThreadPoolElement; @@ -77,6 +78,7 @@ struct ThreadPool { int pending_threads; /* threads created but not running yet */ int pending_cancellations; /* whether we need a cond_broadcast */ bool stopping; +void (*thread_pool_free)(ThreadPool *pool); }; static void *worker_thread(void *opaque) @@ -300,6 +302,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) qemu_sem_init(pool-sem, 0); pool-max_threads = 64; pool-new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); +pool-thread_pool_free = thread_pool_aio_free; QLIST_INIT(pool-head); QTAILQ_INIT(pool-request_list); @@ -316,6 +319,11 @@ ThreadPool *thread_pool_new(AioContext *ctx) void thread_pool_free(ThreadPool *pool) { +pool-thread_pool_free(pool); +} + +void thread_pool_aio_free(ThreadPool *pool) +{ if (!pool) { return; } @@ -346,3 +354,28 @@ void thread_pool_free(ThreadPool *pool) event_notifier_cleanup(pool-notifier); g_free(pool); } + +ThreadPoolFuncArr *thread_pool_probe(void) +{ +ThreadPoolFuncArr *tpf_pool = NULL; + +if (tpf_pool) { +return tpf_pool; +} + +tpf_pool = g_new(ThreadPoolFuncArr, 1); +if (!tpf_pool) { +printf(error allocating thread pool\n); +return NULL; +} + +tpf_pool-thread_pool_submit_aio = thread_pool_submit_aio; +tpf_pool-thread_pool_new = thread_pool_new; + +return tpf_pool; +} + +void thread_pool_delete(ThreadPoolFuncArr *tpf) +{ +g_free(tpf); +} -- 1.8.1.2
[Qemu-devel] [PATCH v2 2/3] Block layer uses modular thread pool
With this patch, the calls to the thread pool functions pass through the new modular thread pool implementation. Signed-off-by: Matthias Brugger matthias@gmail.com --- async.c | 4 ++-- block/raw-posix.c | 15 +++ block/raw-win32.c | 9 +++-- include/block/aio.h | 2 +- include/qemu-common.h | 2 ++ 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/async.c b/async.c index 5fb3fa6..e66f70f 100644 --- a/async.c +++ b/async.c @@ -232,10 +232,10 @@ GSource *aio_get_g_source(AioContext *ctx) return ctx-source; } -ThreadPool *aio_get_thread_pool(AioContext *ctx) +ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr *tpf) { if (!ctx-thread_pool) { -ctx-thread_pool = thread_pool_new(ctx); +ctx-thread_pool = tpf-thread_pool_new(ctx); } return ctx-thread_pool; } diff --git a/block/raw-posix.c b/block/raw-posix.c index f6d48bb..f747301 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -142,6 +142,7 @@ typedef struct BDRVRawState { bool is_xfs : 1; #endif bool has_discard : 1; +ThreadPoolFuncArr *tpf; } BDRVRawState; typedef struct BDRVRawReopenState { @@ -345,6 +346,9 @@ static int raw_open(BlockDriverState *bs, QDict *options, int flags, int ret; s-type = FTYPE_FILE; + +s-tpf = thread_pool_probe(); + ret = raw_open_common(bs, options, flags, 0, local_err); if (error_is_set(local_err)) { error_propagate(errp, local_err); @@ -792,6 +796,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockDriverCompletionFunc *cb, void *opaque, int type) { +BDRVRawState *s = bs-opaque; RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); ThreadPool *pool; @@ -807,8 +812,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, acb-aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); -pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); -return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); +pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf); +return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -874,6 +879,8 @@ static void raw_close(BlockDriverState *bs) qemu_close(s-fd); s-fd = -1; } + +thread_pool_delete(s-tpf); } static int raw_truncate(BlockDriverState *bs, int64_t offset) @@ -1490,8 +1497,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb-aio_offset = 0; acb-aio_ioctl_buf = buf; acb-aio_ioctl_cmd = req; -pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); -return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); +pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf); +return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index 2741e4d..76df266 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -46,6 +46,7 @@ typedef struct RawWin32AIOData { size_t aio_nbytes; off64_t aio_offset; int aio_type; +ThreadPoolFuncArr *tpf; } RawWin32AIOData; typedef struct BDRVRawState { @@ -159,8 +160,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, acb-aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); -pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); -return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); +pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf); +return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } int qemu_ftruncate64(int fd, int64_t length) @@ -248,6 +249,8 @@ static int raw_open(BlockDriverState *bs, QDict *options, int flags, s-type = FTYPE_FILE; +s-tpf = thread_pool_probe(); + opts = qemu_opts_create_nofail(raw_runtime_opts); qemu_opts_absorb_qdict(opts, options, local_err); if (error_is_set(local_err)) { @@ -339,6 +342,8 @@ static void raw_close(BlockDriverState *bs) { BDRVRawState *s = bs-opaque; CloseHandle(s-hfile); + +thread_pool_delete(s-tpf); } static int raw_truncate(BlockDriverState *bs, int64_t offset) diff --git a/include/block/aio.h b/include/block/aio.h index 2efdf41..22d6fa0 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -231,7 +231,7 @@ void aio_set_event_notifier(AioContext *ctx, GSource *aio_get_g_source(AioContext *ctx); /* Return the ThreadPool bound to this AioContext */ -struct ThreadPool *aio_get_thread_pool(AioContext *ctx); +struct ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr *tpf); /* Functions to operate on the main QEMU
[Qemu-devel] Make thread pool implementation modular
This patch series makes the thread pool implementation modular. This allows each drive to use a special implementation. The patch series prepares qemu to be able to include thread pools different the one actually implemented. It will allow to implement approaches like paravirtualized block requests [1]. async.c | 4 ++-- block/raw-posix.c | 15 +++ block/raw-win32.c | 9 +++-- blockdev.c | 13 + include/block/aio.h | 2 +- include/block/thread-pool.h | 9 + include/qemu-common.h | 2 ++ qemu-options.hx | 2 +- thread-pool.c | 32 9 files changed, 78 insertions(+), 10 deletions(-) [1] http://www.linux-kvm.org/wiki/images/5/53/2012-forum-Brugger-lightningtalk.pdf
[Qemu-devel] [PATCH 3/3] Add workerthreads configuration option
This patch allows the definition which thread pool will be used by every block device. The defintion of the workerthreads option allows this at the command line level. At the moment only the thread pool implementation pool can be chosen. Signed-off-by: Matthias Brugger matthias@gmail.com --- blockdev.c | 13 + qemu-options.hx | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/blockdev.c b/blockdev.c index b260477..8b83611 100644 --- a/blockdev.c +++ b/blockdev.c @@ -388,6 +388,15 @@ static DriveInfo *blockdev_init(QDict *bs_opts, } #endif +if ((buf = qemu_opt_get(opts, workerthreads)) != NULL) { +if (!strcmp(buf, pool)) { +/* this is the default */ +} else { +error_report(invalid workerthreads option); +return NULL; +} +} + if ((buf = qemu_opt_get(opts, format)) != NULL) { if (is_help_option(buf)) { error_printf(Supported formats:); @@ -2269,6 +2278,10 @@ QemuOptsList qemu_common_drive_opts = { .type = QEMU_OPT_STRING, .help = disk serial number, },{ +.name = workerthreads, +.type = QEMU_OPT_STRING, +.help = type of worker threads (pool), +},{ .name = rerror, .type = QEMU_OPT_STRING, .help = read error action, diff --git a/qemu-options.hx b/qemu-options.hx index 5dc8b75..6f22242 100644 --- a/qemu-options.hx +++ b/qemu-options.hx @@ -408,7 +408,7 @@ DEF(drive, HAS_ARG, QEMU_OPTION_drive, [,cyls=c,heads=h,secs=s[,trans=t]][,snapshot=on|off]\n [,cache=writethrough|writeback|none|directsync|unsafe][,format=f]\n [,serial=s][,addr=A][,id=name][,aio=threads|native]\n - [,readonly=on|off][,copy-on-read=on|off]\n + [,workerthreads=pool][,readonly=on|off][,copy-on-read=on|off]\n [[,bps=b]|[[,bps_rd=r][,bps_wr=w]]]\n [[,iops=i]|[[,iops_rd=r][,iops_wr=w]]]\n [[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n -- 1.8.1.2
[Qemu-devel] [PATCH 1/3] Make thread pool implementation modular
This patch introduces function pointers for the thread pool, so that it's implementation can be set at run-time. Signed-off-by: Matthias Brugger matthias@gmail.com --- include/block/thread-pool.h | 9 + thread-pool.c | 32 2 files changed, 41 insertions(+) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 32afcdd..53a779f 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -38,4 +38,13 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, void *arg); void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); +ThreadPoolFuncArr *thread_pool_probe(void); +void thread_pool_delete(ThreadPoolFuncArr *tpf); + +struct ThreadPoolFuncArr { +BlockDriverAIOCB *(*thread_pool_submit_aio)(ThreadPool *pool, ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb, void *opaque); +ThreadPool *(*thread_pool_new)(AioContext *ctx); +}; + + #endif diff --git a/thread-pool.c b/thread-pool.c index 3735fd3..8c5d1a2 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -26,6 +26,7 @@ #include qemu/main-loop.h static void do_spawn_thread(ThreadPool *pool); +void thread_pool_aio_free(ThreadPool *pool); typedef struct ThreadPoolElement ThreadPoolElement; @@ -77,6 +78,7 @@ struct ThreadPool { int pending_threads; /* threads created but not running yet */ int pending_cancellations; /* whether we need a cond_broadcast */ bool stopping; +void (*thread_pool_free)(ThreadPool *pool); }; static void *worker_thread(void *opaque) @@ -300,6 +302,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) qemu_sem_init(pool-sem, 0); pool-max_threads = 64; pool-new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); +pool-thread_pool_free = thread_pool_aio_free; QLIST_INIT(pool-head); QTAILQ_INIT(pool-request_list); @@ -316,6 +319,11 @@ ThreadPool *thread_pool_new(AioContext *ctx) void thread_pool_free(ThreadPool *pool) { +pool-thread_pool_free(pool); +} + +void thread_pool_aio_free(ThreadPool *pool) +{ if (!pool) { return; } @@ -346,3 +354,27 @@ void thread_pool_free(ThreadPool *pool) event_notifier_cleanup(pool-notifier); g_free(pool); } + +ThreadPoolFuncArr *thread_pool_probe(void) +{ +ThreadPoolFuncArr *tpf_pool = NULL; + +if (tpf_pool) +return tpf_pool; + +tpf_pool = g_new(ThreadPoolFuncArr, 1); //TODO right now, this leaks! +if (!tpf_pool) { +printf(error allocating thread pool\n); +return NULL; +} + +tpf_pool-thread_pool_submit_aio = thread_pool_submit_aio; +tpf_pool-thread_pool_new = thread_pool_new; + +return tpf_pool; +} + +void thread_pool_delete(ThreadPoolFuncArr *tpf) +{ +g_free(tpf); +} -- 1.8.1.2
[Qemu-devel] [PATCH 2/3] Block layer uses modular thread pool
With this patch, the calls to the thread pool functions pass through the new modular thread pool implementation. Signed-off-by: Matthias Brugger matthias@gmail.com --- async.c | 4 ++-- block/raw-posix.c | 15 +++ block/raw-win32.c | 9 +++-- include/block/aio.h | 2 +- include/qemu-common.h | 2 ++ 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/async.c b/async.c index 5fb3fa6..e66f70f 100644 --- a/async.c +++ b/async.c @@ -232,10 +232,10 @@ GSource *aio_get_g_source(AioContext *ctx) return ctx-source; } -ThreadPool *aio_get_thread_pool(AioContext *ctx) +ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr *tpf) { if (!ctx-thread_pool) { -ctx-thread_pool = thread_pool_new(ctx); +ctx-thread_pool = tpf-thread_pool_new(ctx); } return ctx-thread_pool; } diff --git a/block/raw-posix.c b/block/raw-posix.c index 6f03fbf..22842ed 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -142,6 +142,7 @@ typedef struct BDRVRawState { bool is_xfs : 1; #endif bool has_discard : 1; +ThreadPoolFuncArr *tpf; } BDRVRawState; typedef struct BDRVRawReopenState { @@ -345,6 +346,9 @@ static int raw_open(BlockDriverState *bs, QDict *options, int flags, int ret; s-type = FTYPE_FILE; + +s-tpf = thread_pool_probe(); + ret = raw_open_common(bs, options, flags, 0, local_err); if (error_is_set(local_err)) { error_propagate(errp, local_err); @@ -792,6 +796,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, int64_t sector_num, QEMUIOVector *qiov, int nb_sectors, BlockDriverCompletionFunc *cb, void *opaque, int type) { +BDRVRawState *s = bs-opaque; RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); ThreadPool *pool; @@ -807,8 +812,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, acb-aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); -pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); -return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); +pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf); +return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -874,6 +879,8 @@ static void raw_close(BlockDriverState *bs) qemu_close(s-fd); s-fd = -1; } + +thread_pool_delete(s-tpf); } static int raw_truncate(BlockDriverState *bs, int64_t offset) @@ -1490,8 +1497,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb-aio_offset = 0; acb-aio_ioctl_buf = buf; acb-aio_ioctl_cmd = req; -pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); -return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); +pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf); +return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index 676b570..dad1255 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -46,6 +46,7 @@ typedef struct RawWin32AIOData { size_t aio_nbytes; off64_t aio_offset; int aio_type; +ThreadPoolFuncArr *tpf; } RawWin32AIOData; typedef struct BDRVRawState { @@ -159,8 +160,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, acb-aio_offset = sector_num * 512; trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); -pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); -return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); +pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf); +return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); } int qemu_ftruncate64(int fd, int64_t length) @@ -248,6 +249,8 @@ static int raw_open(BlockDriverState *bs, QDict *options, int flags, s-type = FTYPE_FILE; +s-tpf = thread_pool_probe(); + opts = qemu_opts_create_nofail(raw_runtime_opts); qemu_opts_absorb_qdict(opts, options, local_err); if (error_is_set(local_err)) { @@ -339,6 +342,8 @@ static void raw_close(BlockDriverState *bs) { BDRVRawState *s = bs-opaque; CloseHandle(s-hfile); + +thread_pool_delete(s-tpf); } static int raw_truncate(BlockDriverState *bs, int64_t offset) diff --git a/include/block/aio.h b/include/block/aio.h index 2efdf41..22d6fa0 100644 --- a/include/block/aio.h +++ b/include/block/aio.h @@ -231,7 +231,7 @@ void aio_set_event_notifier(AioContext *ctx, GSource *aio_get_g_source(AioContext *ctx); /* Return the ThreadPool bound to this AioContext */ -struct ThreadPool *aio_get_thread_pool(AioContext *ctx); +struct ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr *tpf); /* Functions to operate on the main QEMU