The IOThread's AioContext is currently created in iothread_init() where it's easy to propagate errors before spawning the thread that runs iothread_run(). However, this means that aio_context_new() is called from the main loop thread rather than from the IOThread.
In order to use Linux io_uring's IORING_SETUP_SINGLE_ISSUER feature in the next commit, only one thread can use the io_uring context and therefore iothread.c must call aio_context_new() from iothread_run() instead of iothread_init(). Extract the iothread_run() arguments into an IOThreadRunArgs struct where an Error *error field can be used to report back initialization errors. This works pretty well thanks to the init_done_sem semaphore that is already used by iothread_init() to wait for iothread_run() to initialize. Move iothread_run() further down for proximity with iothread_init() and to avoid adding a function prototype for iothread_set_aio_context_params(). Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> --- include/system/iothread.h | 1 - iothread.c | 140 +++++++++++++++++++++----------------- 2 files changed, 78 insertions(+), 63 deletions(-) diff --git a/include/system/iothread.h b/include/system/iothread.h index d95c17a645..ec4e798d5e 100644 --- a/include/system/iothread.h +++ b/include/system/iothread.h @@ -29,7 +29,6 @@ struct IOThread { bool run_gcontext; /* whether we should run gcontext */ GMainContext *worker_context; GMainLoop *main_loop; - QemuSemaphore init_done_sem; /* is thread init done? */ bool stopping; /* has iothread_stop() been called? */ bool running; /* should iothread_run() continue? */ int thread_id; diff --git a/iothread.c b/iothread.c index 8810376dce..c6547779d0 100644 --- a/iothread.c +++ b/iothread.c @@ -36,46 +36,6 @@ #define IOTHREAD_POLL_MAX_NS_DEFAULT 0ULL #endif -static void *iothread_run(void *opaque) -{ - IOThread *iothread = opaque; - - rcu_register_thread(); - /* - * g_main_context_push_thread_default() must be called before anything - * in this new thread uses glib. - */ - g_main_context_push_thread_default(iothread->worker_context); - qemu_set_current_aio_context(iothread->ctx); - iothread->thread_id = qemu_get_thread_id(); - qemu_sem_post(&iothread->init_done_sem); - - while (iothread->running) { - /* - * Note: from functional-wise the g_main_loop_run() below can - * already cover the aio_poll() events, but we can't run the - * main loop unconditionally because explicit aio_poll() here - * is faster than g_main_loop_run() when we do not need the - * gcontext at all (e.g., pure block layer iothreads). In - * other words, when we want to run the gcontext with the - * iothread we need to pay some performance for functionality. - */ - aio_poll(iothread->ctx, true); - - /* - * We must check the running state again in case it was - * changed in previous aio_poll() - */ - if (iothread->running && qatomic_read(&iothread->run_gcontext)) { - g_main_loop_run(iothread->main_loop); - } - } - - g_main_context_pop_thread_default(iothread->worker_context); - rcu_unregister_thread(); - return NULL; -} - /* Runs in iothread_run() thread */ static void iothread_stop_bh(void *opaque) { @@ -104,7 +64,6 @@ static void iothread_instance_init(Object *obj) iothread->poll_max_ns = IOTHREAD_POLL_MAX_NS_DEFAULT; iothread->thread_id = -1; - qemu_sem_init(&iothread->init_done_sem, 0); /* By default, we don't run gcontext */ qatomic_set(&iothread->run_gcontext, 0); } @@ -135,7 +94,6 @@ static void iothread_instance_finalize(Object *obj) g_main_loop_unref(iothread->main_loop); iothread->main_loop = NULL; } - qemu_sem_destroy(&iothread->init_done_sem); } static void iothread_init_gcontext(IOThread *iothread, const char *thread_name) @@ -176,47 +134,105 @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp) base->thread_pool_max, errp); } +typedef struct { + IOThread *iothread; + const char *thread_name; + QemuSemaphore init_done_sem; /* is thread init done? */ + Error *error; /* filled in before init_done_sem is posted */ +} IOThreadRunArgs; -static void iothread_init(EventLoopBase *base, Error **errp) +static void *iothread_run(void *opaque) { - Error *local_error = NULL; - IOThread *iothread = IOTHREAD(base); - g_autofree char *thread_name = NULL; + IOThreadRunArgs *args = opaque; + IOThread *iothread = args->iothread; - iothread->stopping = false; - iothread->running = true; - iothread->ctx = aio_context_new(errp); + rcu_register_thread(); + + iothread->ctx = aio_context_new(&args->error); if (!iothread->ctx) { - return; + goto out; } - thread_name = g_strdup_printf("IO %s", - object_get_canonical_path_component(OBJECT(base))); + iothread_set_aio_context_params(EVENT_LOOP_BASE(iothread), &args->error); + if (args->error) { + aio_context_unref(iothread->ctx); + iothread->ctx = NULL; + goto out; + } /* * Init one GMainContext for the iothread unconditionally, even if * it's not used */ - iothread_init_gcontext(iothread, thread_name); + iothread_init_gcontext(iothread, args->thread_name); - iothread_set_aio_context_params(base, &local_error); - if (local_error) { - error_propagate(errp, local_error); - aio_context_unref(iothread->ctx); - iothread->ctx = NULL; - return; + /* + * g_main_context_push_thread_default() must be called before anything + * in this new thread uses glib. + */ + g_main_context_push_thread_default(iothread->worker_context); + qemu_set_current_aio_context(iothread->ctx); + + iothread->stopping = false; + iothread->running = true; + + iothread->thread_id = qemu_get_thread_id(); + qemu_sem_post(&args->init_done_sem); + + while (iothread->running) { + /* + * Note: from functional-wise the g_main_loop_run() below can + * already cover the aio_poll() events, but we can't run the + * main loop unconditionally because explicit aio_poll() here + * is faster than g_main_loop_run() when we do not need the + * gcontext at all (e.g., pure block layer iothreads). In + * other words, when we want to run the gcontext with the + * iothread we need to pay some performance for functionality. + */ + aio_poll(iothread->ctx, true); + + /* + * We must check the running state again in case it was + * changed in previous aio_poll() + */ + if (iothread->running && qatomic_read(&iothread->run_gcontext)) { + g_main_loop_run(iothread->main_loop); + } } + g_main_context_pop_thread_default(iothread->worker_context); +out: + rcu_unregister_thread(); + return NULL; +} + +static void iothread_init(EventLoopBase *base, Error **errp) +{ + IOThread *iothread = IOTHREAD(base); + g_autofree char *thread_name = NULL; + IOThreadRunArgs args = { + .iothread = iothread, + }; + + qemu_sem_init(&args.init_done_sem, 0); + + thread_name = g_strdup_printf("IO %s", + object_get_canonical_path_component(OBJECT(base))); + args.thread_name = thread_name; + /* This assumes we are called from a thread with useful CPU affinity for us * to inherit. */ - qemu_thread_create(&iothread->thread, thread_name, iothread_run, - iothread, QEMU_THREAD_JOINABLE); + qemu_thread_create(&iothread->thread, thread_name, iothread_run, &args, + QEMU_THREAD_JOINABLE); /* Wait for initialization to complete */ while (iothread->thread_id == -1) { - qemu_sem_wait(&iothread->init_done_sem); + qemu_sem_wait(&args.init_done_sem); } + + qemu_sem_destroy(&args.init_done_sem); + error_propagate(errp, args.error); } typedef struct { -- 2.50.1