This patch modifies thread pool to allow replaying asynchronous thread tasks synchronously in replay mode.
Signed-off-by: Pavel Dovgalyuk <pavel.dovga...@ispras.ru> --- block/raw-posix.c | 6 +++-- block/raw-win32.c | 4 ++- include/block/thread-pool.h | 4 ++- replay/replay-events.c | 11 +++++++++ replay/replay-internal.h | 3 ++ replay/replay.h | 2 ++ stubs/replay.c | 4 +++ tests/test-thread-pool.c | 7 +++--- thread-pool.c | 53 +++++++++++++++++++++++++++++-------------- 9 files changed, 69 insertions(+), 25 deletions(-) diff --git a/block/raw-posix.c b/block/raw-posix.c index a857def..f600736 100644 --- a/block/raw-posix.c +++ b/block/raw-posix.c @@ -1025,7 +1025,9 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, @@ -1853,7 +1855,7 @@ static BlockDriverAIOCB *hdev_aio_ioctl(BlockDriverState *bs, acb->aio_ioctl_buf = buf; acb->aio_ioctl_cmd = req; pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, false, 0); } #elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) diff --git a/block/raw-win32.c b/block/raw-win32.c index 902eab6..212307c 100644 --- a/block/raw-win32.c +++ b/block/raw-win32.c @@ -158,7 +158,9 @@ static BlockDriverAIOCB *paio_submit(BlockDriverState *bs, HANDLE hfile, trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); - return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); + return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque, + qiov ? qiov->replay : false, + qiov ? qiov->replay_step : 0); } int qemu_ftruncate64(int fd, int64_t length) diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h index 32afcdd..df74f8d 100644 --- a/include/block/thread-pool.h +++ b/include/block/thread-pool.h @@ -33,9 +33,11 @@ void thread_pool_free(ThreadPool *pool); BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockDriverCompletionFunc *cb, void *opaque); + BlockDriverCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step); int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, void *arg); void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); +void thread_pool_work(ThreadPool *pool, void *r); #endif diff --git a/replay/replay-events.c b/replay/replay-events.c index 81a5a6b..f39889d 100755 --- a/replay/replay-events.c +++ b/replay/replay-events.c @@ -11,6 +11,7 @@ #include "replay.h" #include "replay-internal.h" +#include "block/thread-pool.h" typedef struct Event { int event_kind; @@ -38,6 +39,9 @@ static void replay_run_event(Event *event) case REPLAY_ASYNC_EVENT_BH: aio_bh_call(event->opaque); break; + case REPLAY_ASYNC_EVENT_THREAD: + thread_pool_work((ThreadPool *)event->opaque, event->opaque2); + break; default: fprintf(stderr, "Replay: invalid async event ID (%d) in the queue\n", event->event_kind); @@ -125,6 +129,11 @@ void replay_add_bh_event(void *bh, uint64_t id) replay_add_event_internal(REPLAY_ASYNC_EVENT_BH, bh, NULL, id); } +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ + replay_add_event_internal(REPLAY_ASYNC_EVENT_THREAD, opaque, opaque2, id); +} + void replay_save_events(int opt) { qemu_mutex_lock(&lock); @@ -143,6 +152,7 @@ void replay_save_events(int opt) /* save event-specific data */ switch (event->event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: replay_put_qword(event->id); break; } @@ -175,6 +185,7 @@ void replay_read_events(int opt) /* Execute some events without searching them in the queue */ switch (read_event_kind) { case REPLAY_ASYNC_EVENT_BH: + case REPLAY_ASYNC_EVENT_THREAD: if (read_id == -1) { read_id = replay_get_qword(); } diff --git a/replay/replay-internal.h b/replay/replay-internal.h index bbb117e..3f97fd7 100755 --- a/replay/replay-internal.h +++ b/replay/replay-internal.h @@ -40,7 +40,8 @@ /* Asynchronous events IDs */ #define REPLAY_ASYNC_EVENT_BH 0 -#define REPLAY_ASYNC_COUNT 1 +#define REPLAY_ASYNC_EVENT_THREAD 1 +#define REPLAY_ASYNC_COUNT 2 typedef struct ReplayState { /*! Cached clock values. */ diff --git a/replay/replay.h b/replay/replay.h index 3040b6b..e96c74f 100755 --- a/replay/replay.h +++ b/replay/replay.h @@ -100,5 +100,7 @@ int replay_checkpoint(unsigned int checkpoint); void replay_disable_events(void); /*! Adds BH event to the queue */ void replay_add_bh_event(void *bh, uint64_t id); +/*! Adds thread event to the queue */ +void replay_add_thread_event(void *pool, void *req, uint64_t id); #endif diff --git a/stubs/replay.c b/stubs/replay.c index 1dea1f6..c269b59 100755 --- a/stubs/replay.c +++ b/stubs/replay.c @@ -36,3 +36,7 @@ uint64_t replay_get_current_step(void) { return 0; } + +void replay_add_thread_event(void *opaque, void *opaque2, uint64_t id) +{ +} diff --git a/tests/test-thread-pool.c b/tests/test-thread-pool.c index aa156bc..6dfbb55 100644 --- a/tests/test-thread-pool.c +++ b/tests/test-thread-pool.c @@ -55,7 +55,7 @@ static void test_submit_aio(void) { WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, - done_cb, &data); + done_cb, &data, false, 0); /* The callbacks are not called until after the first wait. */ active = 1; @@ -119,7 +119,8 @@ static void test_submit_many(void) for (i = 0; i < 100; i++) { data[i].n = 0; data[i].ret = -EINPROGRESS; - thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]); + thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i], + false, 0); } active = 100; @@ -148,7 +149,7 @@ static void test_cancel(void) data[i].n = 0; data[i].ret = -EINPROGRESS; data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i], - done_cb, &data[i]); + done_cb, &data[i], false, 0); } /* Starting the threads may be left to a bottom half. Let it diff --git a/thread-pool.c b/thread-pool.c index dfb699d..25cff25 100644 --- a/thread-pool.c +++ b/thread-pool.c @@ -24,6 +24,7 @@ #include "qemu/event_notifier.h" #include "block/thread-pool.h" #include "qemu/main-loop.h" +#include "replay/replay.h" static void do_spawn_thread(ThreadPool *pool); @@ -79,6 +80,30 @@ struct ThreadPool { bool stopping; }; +void thread_pool_work(ThreadPool *pool, void *r) +{ + ThreadPoolElement *req = (ThreadPoolElement *)r; + int ret; + if (replay_mode == REPLAY_NONE) { + qemu_mutex_unlock(&pool->lock); + } + + ret = req->func(req->arg); + req->ret = ret; + /* Write ret before state. */ + smp_wmb(); + req->state = THREAD_DONE; + + if (replay_mode == REPLAY_NONE) { + qemu_mutex_lock(&pool->lock); + } + if (pool->pending_cancellations) { + qemu_cond_broadcast(&pool->check_cancel); + } + + event_notifier_set(&pool->notifier); +} + static void *worker_thread(void *opaque) { ThreadPool *pool = opaque; @@ -105,21 +130,12 @@ static void *worker_thread(void *opaque) req = QTAILQ_FIRST(&pool->request_list); QTAILQ_REMOVE(&pool->request_list, req, reqs); req->state = THREAD_ACTIVE; - qemu_mutex_unlock(&pool->lock); - - ret = req->func(req->arg); - req->ret = ret; - /* Write ret before state. */ - smp_wmb(); - req->state = THREAD_DONE; - - qemu_mutex_lock(&pool->lock); - if (pool->pending_cancellations) { - qemu_cond_broadcast(&pool->check_cancel); + if (replay_mode != REPLAY_NONE && req->common.replay) { + replay_add_thread_event(pool, req, req->common.replay_step); + } else { + thread_pool_work(pool, req); } - - event_notifier_set(&pool->notifier); } pool->cur_threads--; @@ -234,7 +250,8 @@ static const AIOCBInfo thread_pool_aiocb_info = { BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, ThreadPoolFunc *func, void *arg, - BlockDriverCompletionFunc *cb, void *opaque) + BlockDriverCompletionFunc *cb, void *opaque, + bool replay, uint64_t replay_step) { ThreadPoolElement *req; @@ -243,6 +260,8 @@ BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, req->arg = arg; req->state = THREAD_QUEUED; req->pool = pool; + req->common.replay = replay; + req->common.replay_step = replay_step; QLIST_INSERT_HEAD(&pool->head, req, all); @@ -253,8 +272,8 @@ BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, spawn_thread(pool); } QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs); - qemu_mutex_unlock(&pool->lock); qemu_sem_post(&pool->sem); + qemu_mutex_unlock(&pool->lock); return &req->common; } @@ -276,14 +295,14 @@ int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, { ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; assert(qemu_in_coroutine()); - thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc); + thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc, false, 0); qemu_coroutine_yield(); return tpc.ret; } void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg) { - thread_pool_submit_aio(pool, func, arg, NULL, NULL); + thread_pool_submit_aio(pool, func, arg, NULL, NULL, false, 0); } static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)