Re: [Qemu-devel] [PATCH v2 0/3] Make thread pool implementation modular

2013-12-05 Thread Matthias Brugger
2013/11/11 Stefan Hajnoczi stefa...@redhat.com:
 On Mon, Nov 11, 2013 at 11:00:45AM +0100, Matthias Brugger wrote:
 2013/11/5 Stefan Hajnoczi stefa...@gmail.com:
  I'd also like to see the thread pool implementation you wish to add
  before we add a layer of indirection which has no users yet.

 Fair enough, I will evaluate if it will make more sense to implement a
 new AIO infrastructure instead to try reuse the thread-pool.
 Actually my implementation will differ in the way, that we will have
 several workerthreads with everyone of them having its own queue. The
 requests will be distributed between them depending on an identifier.
 The request function which  the worker_thread call will be the same as
 using aio=threads, so I'm not quite sure which will be the way to go.
 Any opinions and hints, like the one you gave are highly appreciated.

 If I understand the slides you linked to correctly, the guest will pass
 an identifier with each request.  The host has worker threads allowing
 each stream of requests to be serviced independently.  The idea is to
 associate guest processes with unique identifiers.

 The guest I/O scheduler is supposed to submit requests in a way that
 meets certain policies (e.g. fairness between processes, deadlines,
 etc).

 Why is it necessary to push this task down into the host?  I don't
 understand the advantage of this approach except that maybe it works
 around certain misconfigurations, I/O scheduler quirks, or plain old
 bugs - all of which should be investigated and fixed at the source
 instead of adding another layer of code to mask them.

It is about I/O scheduling. CFQ the state of the art I/O scheduler
merges adjacent requests from the same PID before dispatching them to
the disk.
If we can distinguish between the different threads of a virtual
machine that read/write a file, the I/O scheduler in the host can
merge requests in an effective way for sequential access. Qemu fails
in this, because of its architecture. Apart that at the moment there
is no way to distinguish the guest threads from each other (I'm
working on some kernel patches), Qemu has one big queue from which
several workerthreads grab requests and dispatch them to the disk.
Even if you have one large read from just one thread in the guest, the
I/O scheduler in the host will get the requests from different PIDs (=
workerthreads) and won't be able to merge them.
In former versions, there was some work done to merge requests in
Qemu, but I don't think they were very useful, because you don't know
how the layout of the image file looks like on the physical disk.
Anyway I think this code parts have been removed.
The only layer where you really know how the blocks of the virtual
disk image are distributed over the disk is the block layer of the
host. So you have to do the block request merging there. With the new
architecture this would come for free, as you can map every thread
from a guest to one workerthread of Qemu.

Matthias


 Stefan



-- 
motzblog.wordpress.com



Re: [Qemu-devel] [PATCH v2 0/3] Make thread pool implementation modular

2013-11-11 Thread Matthias Brugger
2013/11/5 Stefan Hajnoczi stefa...@gmail.com:
 On Mon, Nov 04, 2013 at 11:28:41AM +0100, Matthias Brugger wrote:
 v2:
  - fix issues found by checkpatch.pl
  - change the descritpion of patch 3

 This patch series makes the thread pool implementation modular.
 This allows each drive to use a special implementation.
 The patch series prepares qemu to be able to include thread pools different 
 to
 the one actually implemented. It will allow to implement approaches like
 paravirtualized block requests [1].

 [1] 
 http://www.linux-kvm.org/wiki/images/5/53/2012-forum-Brugger-lightningtalk.pdf

 -drive aio=threads|native already distinguishes between different AIO
 implementations.  IMO -drive aio= is the logical interface if you want
 to add a new AIO mechanism.

Good point.


 I'd also like to see the thread pool implementation you wish to add
 before we add a layer of indirection which has no users yet.

Fair enough, I will evaluate if it will make more sense to implement a
new AIO infrastructure instead to try reuse the thread-pool.
Actually my implementation will differ in the way, that we will have
several workerthreads with everyone of them having its own queue. The
requests will be distributed between them depending on an identifier.
The request function which  the worker_thread call will be the same as
using aio=threads, so I'm not quite sure which will be the way to go.
Any opinions and hints, like the one you gave are highly appreciated.


-- 
motzblog.wordpress.com



[Qemu-devel] [PATCH v2 0/3] Make thread pool implementation modular

2013-11-04 Thread Matthias Brugger
v2:
 - fix issues found by checkpatch.pl
 - change the descritpion of patch 3

This patch series makes the thread pool implementation modular.
This allows each drive to use a special implementation.
The patch series prepares qemu to be able to include thread pools different to
the one actually implemented. It will allow to implement approaches like
paravirtualized block requests [1].

[1] 
http://www.linux-kvm.org/wiki/images/5/53/2012-forum-Brugger-lightningtalk.pdf

Matthias Brugger (3):
  Make thread pool implementation modular
  Block layer uses modular thread pool
  Add workerthreads configuration option

 async.c |  4 ++--
 block/raw-posix.c   | 15 +++
 block/raw-win32.c   |  9 +++--
 blockdev.c  | 13 +
 include/block/aio.h |  2 +-
 include/block/thread-pool.h | 11 +++
 include/qemu-common.h   |  2 ++
 qemu-options.hx |  2 +-
 thread-pool.c   | 33 +
 9 files changed, 81 insertions(+), 10 deletions(-)

-- 
1.8.1.2




[Qemu-devel] [PATCH v2 3/3] Add workerthreads configuration option

2013-11-04 Thread Matthias Brugger
This patch allows to choose at the command line level which thread pool
implementation will be used by every block device.

Signed-off-by: Matthias Brugger matthias@gmail.com
---
 blockdev.c  | 13 +
 qemu-options.hx |  2 +-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/blockdev.c b/blockdev.c
index b260477..a16cc9a 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -387,6 +387,15 @@ static DriveInfo *blockdev_init(QDict *bs_opts,
 }
 }
 #endif
+buf = qemu_opt_get(opts, workerthreads);
+if (buf != NULL) {
+if (!strcmp(buf, pool)) {
+/* this is the default */
+} else {
+error_report(invalid workerthreads option);
+return NULL;
+}
+}
 
 if ((buf = qemu_opt_get(opts, format)) != NULL) {
 if (is_help_option(buf)) {
@@ -2269,6 +2278,10 @@ QemuOptsList qemu_common_drive_opts = {
 .type = QEMU_OPT_STRING,
 .help = disk serial number,
 },{
+.name = workerthreads,
+.type = QEMU_OPT_STRING,
+.help = type of worker threads (pool),
+},{
 .name = rerror,
 .type = QEMU_OPT_STRING,
 .help = read error action,
diff --git a/qemu-options.hx b/qemu-options.hx
index 5dc8b75..6f22242 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -408,7 +408,7 @@ DEF(drive, HAS_ARG, QEMU_OPTION_drive,
[,cyls=c,heads=h,secs=s[,trans=t]][,snapshot=on|off]\n

[,cache=writethrough|writeback|none|directsync|unsafe][,format=f]\n
[,serial=s][,addr=A][,id=name][,aio=threads|native]\n
-   [,readonly=on|off][,copy-on-read=on|off]\n
+   [,workerthreads=pool][,readonly=on|off][,copy-on-read=on|off]\n
[[,bps=b]|[[,bps_rd=r][,bps_wr=w]]]\n
[[,iops=i]|[[,iops_rd=r][,iops_wr=w]]]\n
[[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n
-- 
1.8.1.2




[Qemu-devel] [PATCH v2 1/3] Make thread pool implementation modular

2013-11-04 Thread Matthias Brugger
This patch introduces function pointers for the thread pool, so that
it's implementation can be set at run-time.

Signed-off-by: Matthias Brugger matthias@gmail.com
---
 include/block/thread-pool.h | 11 +++
 thread-pool.c   | 33 +
 2 files changed, 44 insertions(+)

diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 32afcdd..1f73712 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -38,4 +38,15 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
 ThreadPoolFunc *func, void *arg);
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
 
+ThreadPoolFuncArr *thread_pool_probe(void);
+void thread_pool_delete(ThreadPoolFuncArr *tpf);
+
+struct ThreadPoolFuncArr {
+BlockDriverAIOCB *(*thread_pool_submit_aio)(ThreadPool *pool,
+ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb,
+void *opaque);
+ThreadPool *(*thread_pool_new)(AioContext *ctx);
+};
+
+
 #endif
diff --git a/thread-pool.c b/thread-pool.c
index 3735fd3..53294a9 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -26,6 +26,7 @@
 #include qemu/main-loop.h
 
 static void do_spawn_thread(ThreadPool *pool);
+static void thread_pool_aio_free(ThreadPool *pool);
 
 typedef struct ThreadPoolElement ThreadPoolElement;
 
@@ -77,6 +78,7 @@ struct ThreadPool {
 int pending_threads; /* threads created but not running yet */
 int pending_cancellations; /* whether we need a cond_broadcast */
 bool stopping;
+void (*thread_pool_free)(ThreadPool *pool);
 };
 
 static void *worker_thread(void *opaque)
@@ -300,6 +302,7 @@ static void thread_pool_init_one(ThreadPool *pool, 
AioContext *ctx)
 qemu_sem_init(pool-sem, 0);
 pool-max_threads = 64;
 pool-new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
+pool-thread_pool_free = thread_pool_aio_free;
 
 QLIST_INIT(pool-head);
 QTAILQ_INIT(pool-request_list);
@@ -316,6 +319,11 @@ ThreadPool *thread_pool_new(AioContext *ctx)
 
 void thread_pool_free(ThreadPool *pool)
 {
+pool-thread_pool_free(pool);
+}
+
+void thread_pool_aio_free(ThreadPool *pool)
+{
 if (!pool) {
 return;
 }
@@ -346,3 +354,28 @@ void thread_pool_free(ThreadPool *pool)
 event_notifier_cleanup(pool-notifier);
 g_free(pool);
 }
+
+ThreadPoolFuncArr *thread_pool_probe(void)
+{
+ThreadPoolFuncArr *tpf_pool = NULL;
+
+if (tpf_pool) {
+return tpf_pool;
+}
+
+tpf_pool = g_new(ThreadPoolFuncArr, 1);
+if (!tpf_pool) {
+printf(error allocating thread pool\n);
+return NULL;
+}
+
+tpf_pool-thread_pool_submit_aio = thread_pool_submit_aio;
+tpf_pool-thread_pool_new = thread_pool_new;
+
+return tpf_pool;
+}
+
+void thread_pool_delete(ThreadPoolFuncArr *tpf)
+{
+g_free(tpf);
+}
-- 
1.8.1.2




[Qemu-devel] [PATCH v2 2/3] Block layer uses modular thread pool

2013-11-04 Thread Matthias Brugger
With this patch, the calls to the thread pool functions pass through the
new modular thread pool implementation.

Signed-off-by: Matthias Brugger matthias@gmail.com
---
 async.c   |  4 ++--
 block/raw-posix.c | 15 +++
 block/raw-win32.c |  9 +++--
 include/block/aio.h   |  2 +-
 include/qemu-common.h |  2 ++
 5 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/async.c b/async.c
index 5fb3fa6..e66f70f 100644
--- a/async.c
+++ b/async.c
@@ -232,10 +232,10 @@ GSource *aio_get_g_source(AioContext *ctx)
 return ctx-source;
 }
 
-ThreadPool *aio_get_thread_pool(AioContext *ctx)
+ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr *tpf)
 {
 if (!ctx-thread_pool) {
-ctx-thread_pool = thread_pool_new(ctx);
+ctx-thread_pool = tpf-thread_pool_new(ctx);
 }
 return ctx-thread_pool;
 }
diff --git a/block/raw-posix.c b/block/raw-posix.c
index f6d48bb..f747301 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -142,6 +142,7 @@ typedef struct BDRVRawState {
 bool is_xfs : 1;
 #endif
 bool has_discard : 1;
+ThreadPoolFuncArr *tpf;
 } BDRVRawState;
 
 typedef struct BDRVRawReopenState {
@@ -345,6 +346,9 @@ static int raw_open(BlockDriverState *bs, QDict *options, 
int flags,
 int ret;
 
 s-type = FTYPE_FILE;
+
+s-tpf = thread_pool_probe();
+
 ret = raw_open_common(bs, options, flags, 0, local_err);
 if (error_is_set(local_err)) {
 error_propagate(errp, local_err);
@@ -792,6 +796,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, 
int fd,
 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
+BDRVRawState *s = bs-opaque;
 RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
 ThreadPool *pool;
 
@@ -807,8 +812,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, 
int fd,
 acb-aio_offset = sector_num * 512;
 
 trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
-return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
+pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf);
+return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
@@ -874,6 +879,8 @@ static void raw_close(BlockDriverState *bs)
 qemu_close(s-fd);
 s-fd = -1;
 }
+
+thread_pool_delete(s-tpf);
 }
 
 static int raw_truncate(BlockDriverState *bs, int64_t offset)
@@ -1490,8 +1497,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState 
*bs,
 acb-aio_offset = 0;
 acb-aio_ioctl_buf = buf;
 acb-aio_ioctl_cmd = req;
-pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
-return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
+pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf);
+return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
diff --git a/block/raw-win32.c b/block/raw-win32.c
index 2741e4d..76df266 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -46,6 +46,7 @@ typedef struct RawWin32AIOData {
 size_t aio_nbytes;
 off64_t aio_offset;
 int aio_type;
+ThreadPoolFuncArr *tpf;
 } RawWin32AIOData;
 
 typedef struct BDRVRawState {
@@ -159,8 +160,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, 
HANDLE hfile,
 acb-aio_offset = sector_num * 512;
 
 trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
-return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
+pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf);
+return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 int qemu_ftruncate64(int fd, int64_t length)
@@ -248,6 +249,8 @@ static int raw_open(BlockDriverState *bs, QDict *options, 
int flags,
 
 s-type = FTYPE_FILE;
 
+s-tpf = thread_pool_probe();
+
 opts = qemu_opts_create_nofail(raw_runtime_opts);
 qemu_opts_absorb_qdict(opts, options, local_err);
 if (error_is_set(local_err)) {
@@ -339,6 +342,8 @@ static void raw_close(BlockDriverState *bs)
 {
 BDRVRawState *s = bs-opaque;
 CloseHandle(s-hfile);
+
+thread_pool_delete(s-tpf);
 }
 
 static int raw_truncate(BlockDriverState *bs, int64_t offset)
diff --git a/include/block/aio.h b/include/block/aio.h
index 2efdf41..22d6fa0 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -231,7 +231,7 @@ void aio_set_event_notifier(AioContext *ctx,
 GSource *aio_get_g_source(AioContext *ctx);
 
 /* Return the ThreadPool bound to this AioContext */
-struct ThreadPool *aio_get_thread_pool(AioContext *ctx);
+struct ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr 
*tpf);
 
 /* Functions to operate on the main QEMU

[Qemu-devel] Make thread pool implementation modular

2013-11-01 Thread Matthias Brugger
This patch series makes the thread pool implementation modular.
This allows each drive to use a special implementation.
The patch series prepares qemu to be able to include thread pools different
the one actually implemented. It will allow to implement approaches like 
paravirtualized block requests [1].

 async.c |  4 ++--
 block/raw-posix.c   | 15 +++
 block/raw-win32.c   |  9 +++--
 blockdev.c  | 13 +
 include/block/aio.h |  2 +-
 include/block/thread-pool.h |  9 +
 include/qemu-common.h   |  2 ++
 qemu-options.hx |  2 +-
 thread-pool.c   | 32 
 9 files changed, 78 insertions(+), 10 deletions(-)

[1] 
http://www.linux-kvm.org/wiki/images/5/53/2012-forum-Brugger-lightningtalk.pdf




[Qemu-devel] [PATCH 3/3] Add workerthreads configuration option

2013-11-01 Thread Matthias Brugger
This patch allows the definition which thread  pool will be used by
every block device. The defintion of the workerthreads option
allows this at the command line level.

At the moment only the thread pool implementation pool can be chosen.

Signed-off-by: Matthias Brugger matthias@gmail.com
---
 blockdev.c  | 13 +
 qemu-options.hx |  2 +-
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/blockdev.c b/blockdev.c
index b260477..8b83611 100644
--- a/blockdev.c
+++ b/blockdev.c
@@ -388,6 +388,15 @@ static DriveInfo *blockdev_init(QDict *bs_opts,
 }
 #endif
 
+if ((buf = qemu_opt_get(opts, workerthreads)) != NULL) {
+if (!strcmp(buf, pool)) {
+/* this is the default */
+} else {
+error_report(invalid workerthreads option);
+return NULL;
+}
+}
+
 if ((buf = qemu_opt_get(opts, format)) != NULL) {
 if (is_help_option(buf)) {
 error_printf(Supported formats:);
@@ -2269,6 +2278,10 @@ QemuOptsList qemu_common_drive_opts = {
 .type = QEMU_OPT_STRING,
 .help = disk serial number,
 },{
+.name = workerthreads,
+.type = QEMU_OPT_STRING,
+.help = type of worker threads (pool),
+},{
 .name = rerror,
 .type = QEMU_OPT_STRING,
 .help = read error action,
diff --git a/qemu-options.hx b/qemu-options.hx
index 5dc8b75..6f22242 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -408,7 +408,7 @@ DEF(drive, HAS_ARG, QEMU_OPTION_drive,
[,cyls=c,heads=h,secs=s[,trans=t]][,snapshot=on|off]\n

[,cache=writethrough|writeback|none|directsync|unsafe][,format=f]\n
[,serial=s][,addr=A][,id=name][,aio=threads|native]\n
-   [,readonly=on|off][,copy-on-read=on|off]\n
+   [,workerthreads=pool][,readonly=on|off][,copy-on-read=on|off]\n
[[,bps=b]|[[,bps_rd=r][,bps_wr=w]]]\n
[[,iops=i]|[[,iops_rd=r][,iops_wr=w]]]\n
[[,bps_max=bm]|[[,bps_rd_max=rm][,bps_wr_max=wm]]]\n
-- 
1.8.1.2




[Qemu-devel] [PATCH 1/3] Make thread pool implementation modular

2013-11-01 Thread Matthias Brugger
This patch introduces function pointers for the thread pool, so that
it's implementation can be set at run-time.

Signed-off-by: Matthias Brugger matthias@gmail.com
---
 include/block/thread-pool.h |  9 +
 thread-pool.c   | 32 
 2 files changed, 41 insertions(+)

diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 32afcdd..53a779f 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -38,4 +38,13 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
 ThreadPoolFunc *func, void *arg);
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
 
+ThreadPoolFuncArr *thread_pool_probe(void);
+void thread_pool_delete(ThreadPoolFuncArr *tpf);
+
+struct ThreadPoolFuncArr {
+BlockDriverAIOCB *(*thread_pool_submit_aio)(ThreadPool *pool, 
ThreadPoolFunc *func, void *arg, BlockDriverCompletionFunc *cb, void *opaque);
+ThreadPool *(*thread_pool_new)(AioContext *ctx);
+};
+
+
 #endif
diff --git a/thread-pool.c b/thread-pool.c
index 3735fd3..8c5d1a2 100644
--- a/thread-pool.c
+++ b/thread-pool.c
@@ -26,6 +26,7 @@
 #include qemu/main-loop.h
 
 static void do_spawn_thread(ThreadPool *pool);
+void thread_pool_aio_free(ThreadPool *pool);
 
 typedef struct ThreadPoolElement ThreadPoolElement;
 
@@ -77,6 +78,7 @@ struct ThreadPool {
 int pending_threads; /* threads created but not running yet */
 int pending_cancellations; /* whether we need a cond_broadcast */
 bool stopping;
+void (*thread_pool_free)(ThreadPool *pool);
 };
 
 static void *worker_thread(void *opaque)
@@ -300,6 +302,7 @@ static void thread_pool_init_one(ThreadPool *pool, 
AioContext *ctx)
 qemu_sem_init(pool-sem, 0);
 pool-max_threads = 64;
 pool-new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
+pool-thread_pool_free = thread_pool_aio_free;
 
 QLIST_INIT(pool-head);
 QTAILQ_INIT(pool-request_list);
@@ -316,6 +319,11 @@ ThreadPool *thread_pool_new(AioContext *ctx)
 
 void thread_pool_free(ThreadPool *pool)
 {
+pool-thread_pool_free(pool);
+}
+
+void thread_pool_aio_free(ThreadPool *pool)
+{
 if (!pool) {
 return;
 }
@@ -346,3 +354,27 @@ void thread_pool_free(ThreadPool *pool)
 event_notifier_cleanup(pool-notifier);
 g_free(pool);
 }
+
+ThreadPoolFuncArr *thread_pool_probe(void)
+{
+ThreadPoolFuncArr *tpf_pool = NULL;
+
+if (tpf_pool)
+return tpf_pool;
+
+tpf_pool = g_new(ThreadPoolFuncArr, 1); //TODO right now, this leaks!
+if (!tpf_pool) {
+printf(error allocating thread pool\n);
+return NULL;
+}
+
+tpf_pool-thread_pool_submit_aio = thread_pool_submit_aio;
+tpf_pool-thread_pool_new = thread_pool_new;
+
+return tpf_pool;
+}
+
+void thread_pool_delete(ThreadPoolFuncArr *tpf)
+{
+g_free(tpf);
+}
-- 
1.8.1.2




[Qemu-devel] [PATCH 2/3] Block layer uses modular thread pool

2013-11-01 Thread Matthias Brugger
With this patch, the calls to the thread pool functions pass through the
new modular thread pool implementation.

Signed-off-by: Matthias Brugger matthias@gmail.com
---
 async.c   |  4 ++--
 block/raw-posix.c | 15 +++
 block/raw-win32.c |  9 +++--
 include/block/aio.h   |  2 +-
 include/qemu-common.h |  2 ++
 5 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/async.c b/async.c
index 5fb3fa6..e66f70f 100644
--- a/async.c
+++ b/async.c
@@ -232,10 +232,10 @@ GSource *aio_get_g_source(AioContext *ctx)
 return ctx-source;
 }
 
-ThreadPool *aio_get_thread_pool(AioContext *ctx)
+ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr *tpf)
 {
 if (!ctx-thread_pool) {
-ctx-thread_pool = thread_pool_new(ctx);
+ctx-thread_pool = tpf-thread_pool_new(ctx);
 }
 return ctx-thread_pool;
 }
diff --git a/block/raw-posix.c b/block/raw-posix.c
index 6f03fbf..22842ed 100644
--- a/block/raw-posix.c
+++ b/block/raw-posix.c
@@ -142,6 +142,7 @@ typedef struct BDRVRawState {
 bool is_xfs : 1;
 #endif
 bool has_discard : 1;
+ThreadPoolFuncArr *tpf;
 } BDRVRawState;
 
 typedef struct BDRVRawReopenState {
@@ -345,6 +346,9 @@ static int raw_open(BlockDriverState *bs, QDict *options, 
int flags,
 int ret;
 
 s-type = FTYPE_FILE;
+
+s-tpf = thread_pool_probe();
+
 ret = raw_open_common(bs, options, flags, 0, local_err);
 if (error_is_set(local_err)) {
 error_propagate(errp, local_err);
@@ -792,6 +796,7 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, 
int fd,
 int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 BlockDriverCompletionFunc *cb, void *opaque, int type)
 {
+BDRVRawState *s = bs-opaque;
 RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
 ThreadPool *pool;
 
@@ -807,8 +812,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, 
int fd,
 acb-aio_offset = sector_num * 512;
 
 trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
-return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
+pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf);
+return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
@@ -874,6 +879,8 @@ static void raw_close(BlockDriverState *bs)
 qemu_close(s-fd);
 s-fd = -1;
 }
+
+thread_pool_delete(s-tpf);
 }
 
 static int raw_truncate(BlockDriverState *bs, int64_t offset)
@@ -1490,8 +1497,8 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState 
*bs,
 acb-aio_offset = 0;
 acb-aio_ioctl_buf = buf;
 acb-aio_ioctl_cmd = req;
-pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
-return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
+pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf);
+return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
diff --git a/block/raw-win32.c b/block/raw-win32.c
index 676b570..dad1255 100644
--- a/block/raw-win32.c
+++ b/block/raw-win32.c
@@ -46,6 +46,7 @@ typedef struct RawWin32AIOData {
 size_t aio_nbytes;
 off64_t aio_offset;
 int aio_type;
+ThreadPoolFuncArr *tpf;
 } RawWin32AIOData;
 
 typedef struct BDRVRawState {
@@ -159,8 +160,8 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, 
HANDLE hfile,
 acb-aio_offset = sector_num * 512;
 
 trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
-pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
-return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
+pool = aio_get_thread_pool(bdrv_get_aio_context(bs), s-tpf);
+return s-tpf-thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
 }
 
 int qemu_ftruncate64(int fd, int64_t length)
@@ -248,6 +249,8 @@ static int raw_open(BlockDriverState *bs, QDict *options, 
int flags,
 
 s-type = FTYPE_FILE;
 
+s-tpf = thread_pool_probe();
+
 opts = qemu_opts_create_nofail(raw_runtime_opts);
 qemu_opts_absorb_qdict(opts, options, local_err);
 if (error_is_set(local_err)) {
@@ -339,6 +342,8 @@ static void raw_close(BlockDriverState *bs)
 {
 BDRVRawState *s = bs-opaque;
 CloseHandle(s-hfile);
+
+thread_pool_delete(s-tpf);
 }
 
 static int raw_truncate(BlockDriverState *bs, int64_t offset)
diff --git a/include/block/aio.h b/include/block/aio.h
index 2efdf41..22d6fa0 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -231,7 +231,7 @@ void aio_set_event_notifier(AioContext *ctx,
 GSource *aio_get_g_source(AioContext *ctx);
 
 /* Return the ThreadPool bound to this AioContext */
-struct ThreadPool *aio_get_thread_pool(AioContext *ctx);
+struct ThreadPool *aio_get_thread_pool(AioContext *ctx, ThreadPoolFuncArr 
*tpf);
 
 /* Functions to operate on the main QEMU