This patch moves the threadlet queue API code to qemu-threadlets.c where these APIs can be used by other subsystems.
Signed-off-by: Arun R Bharadwaj <a...@linux.vnet.ibm.com> --- Makefile.objs | 1 posix-aio-compat.c | 144 ---------------------------------------------------- qemu-thread.h | 1 qemu-threadlets.c | 142 +++++++++++++++++++++++++++++++++++++++++++++++++++ qemu-threadlets.h | 43 ++++++++++++++++ vl.c | 2 - 6 files changed, 188 insertions(+), 145 deletions(-) create mode 100644 qemu-threadlets.c create mode 100644 qemu-threadlets.h diff --git a/Makefile.objs b/Makefile.objs index 3b7ec27..2cf8aba 100644 --- a/Makefile.objs +++ b/Makefile.objs @@ -10,6 +10,7 @@ qobject-obj-y += qerror.o block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o block-obj-$(CONFIG_POSIX) += qemu-thread.o +block-obj-$(CONFIG_POSIX) += qemu-threadlets.o block-obj-$(CONFIG_POSIX) += posix-aio-compat.o block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o diff --git a/posix-aio-compat.c b/posix-aio-compat.c index 94dd007..df55ce6 100644 --- a/posix-aio-compat.c +++ b/posix-aio-compat.c @@ -26,33 +26,13 @@ #include "qemu-common.h" #include "trace.h" #include "block_int.h" -#include "qemu-thread.h" +#include "qemu-threadlets.h" #include "block/raw-posix-aio.h" -#define MAX_GLOBAL_THREADS 64 -#define MIN_GLOBAL_THREADS 8 - static QemuMutex aiocb_mutex; static QemuCond aiocb_completion; -typedef struct ThreadletQueue -{ - QemuMutex lock; - QemuCond cond; - int max_threads; - int min_threads; - int cur_threads; - int idle_threads; - QTAILQ_HEAD(, ThreadletWork) request_list; -} ThreadletQueue; - -typedef struct ThreadletWork -{ - QTAILQ_ENTRY(ThreadletWork) node; - void (*func)(struct ThreadletWork *work); -} ThreadletWork; - struct qemu_paiocb { BlockDriverAIOCB common; int aio_fildes; @@ -79,10 +59,6 @@ typedef struct PosixAioState { struct qemu_paiocb *first_aio; } PosixAioState; -/* Default ThreadletQueue */ -static ThreadletQueue globalqueue; -static int globalqueue_init; - #ifdef CONFIG_PREADV static int preadv_present = 1; #else @@ -283,50 +259,6 @@ static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb) return nbytes; } -static void *threadlet_worker(void *data) -{ - ThreadletQueue *queue = data; - - qemu_mutex_lock(&queue->lock); - while (1) { - ThreadletWork *work; - int ret = 0; - - while (QTAILQ_EMPTY(&queue->request_list) && - (ret != ETIMEDOUT)) { - /* wait for cond to be signalled or broadcast for 1000s */ - ret = qemu_cond_timedwait((&queue->cond), - &(queue->lock), 10*100000); - } - - assert(queue->idle_threads != 0); - if (QTAILQ_EMPTY(&queue->request_list)) { - if (queue->cur_threads > queue->min_threads) { - /* We retain the minimum number of threads */ - break; - } - } else { - work = QTAILQ_FIRST(&queue->request_list); - QTAILQ_REMOVE(&queue->request_list, work, node); - - queue->idle_threads--; - qemu_mutex_unlock(&queue->lock); - - /* execute the work function */ - work->func(work); - - qemu_mutex_lock(&queue->lock); - queue->idle_threads++; - } - } - - queue->idle_threads--; - queue->cur_threads--; - qemu_mutex_unlock(&queue->lock); - - return NULL; -} - static PosixAioState *posix_aio_state; static void handle_work(ThreadletWork *work) @@ -373,67 +305,6 @@ static void handle_work(ThreadletWork *work) } } -static void threadlet_io_completion_signal_handler(int signum) -{ - qemu_service_io(); -} - -static void threadlet_register_signal_handler(void) -{ - struct sigaction act; - sigfillset(&act.sa_mask); - act.sa_flags = 0; /* do not restart syscalls to interrupt select() */ - act.sa_handler = threadlet_io_completion_signal_handler; - sigaction(SIGUSR2, &act, NULL); -} - -void threadlet_init(void) -{ - threadlet_register_signal_handler(); -} - -static void spawn_threadlet(ThreadletQueue *queue) -{ - QemuThread thread; - - queue->cur_threads++; - queue->idle_threads++; - - qemu_thread_create(&thread, threadlet_worker, queue); -} - -/** - * submit_work: Submit to the global queue a new task to be executed - * asynchronously. - * @work: Contains information about the task that needs to be submitted. - */ -static void submit_work(ThreadletWork *work) -{ - qemu_mutex_lock(&globalqueue.lock); - - if (!globalqueue_init) { - globalqueue.cur_threads = 0; - globalqueue.idle_threads = 0; - globalqueue.max_threads = MAX_GLOBAL_THREADS; - globalqueue.min_threads = MIN_GLOBAL_THREADS; - QTAILQ_INIT(&globalqueue.request_list); - qemu_mutex_init(&globalqueue.lock); - qemu_cond_init(&globalqueue.cond); - - globalqueue_init = 1; - } - - if (globalqueue.idle_threads == 0 && - globalqueue.cur_threads < globalqueue.max_threads) { - spawn_threadlet(&globalqueue); - - } else { - qemu_cond_signal(&globalqueue.cond); - } - QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node); - qemu_mutex_unlock(&globalqueue.lock); -} - static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb) { ssize_t ret; @@ -566,19 +437,6 @@ static void paio_remove(struct qemu_paiocb *acb) } } -/** - * dequeue_work: Cancel a task queued on the global queue. - * @work: Contains the information of the task that needs to be cancelled. - */ -static int dequeue_work(ThreadletWork *work) -{ - qemu_mutex_lock(&globalqueue.lock); - QTAILQ_REMOVE(&globalqueue.request_list, work, node); - qemu_mutex_unlock(&globalqueue.lock); - - return 0; -} - static void paio_cancel(BlockDriverAIOCB *blockacb) { struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb; diff --git a/qemu-thread.h b/qemu-thread.h index c5b579c..19bb30c 100644 --- a/qemu-thread.h +++ b/qemu-thread.h @@ -40,6 +40,5 @@ void qemu_thread_signal(QemuThread *thread, int sig); void qemu_thread_self(QemuThread *thread); int qemu_thread_equal(QemuThread *thread1, QemuThread *thread2); void qemu_thread_exit(void *retval); -void threadlet_init(void); #endif diff --git a/qemu-threadlets.c b/qemu-threadlets.c new file mode 100644 index 0000000..42dd3d1 --- /dev/null +++ b/qemu-threadlets.c @@ -0,0 +1,142 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori <aligu...@us.ibm.com> + * Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> + * Gautham R Shenoy <gautham.she...@gmail.com> + * Arun R Bharadwaj <a...@linux.vnet.ibm.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#include <signal.h> + +#include "qemu-threadlets.h" +#include "osdep.h" + +#define MAX_GLOBAL_THREADS 64 +#define MIN_GLOBAL_THREADS 8 + +static ThreadletQueue globalqueue; +static int globalqueue_init; + +static void threadlet_io_completion_signal_handler(int signum) +{ + qemu_service_io(); +} + +static void threadlet_register_signal_handler(void) +{ + struct sigaction act; + sigfillset(&act.sa_mask); + act.sa_flags = 0; /* do not restart syscalls to interrupt select() */ + act.sa_handler = threadlet_io_completion_signal_handler; + sigaction(SIGUSR2, &act, NULL); +} + +void threadlet_init(void) +{ + threadlet_register_signal_handler(); +} + +static void *threadlet_worker(void *data) +{ + ThreadletQueue *queue = data; + + qemu_mutex_lock(&queue->lock); + while (1) { + ThreadletWork *work; + int ret = 0; + + while (QTAILQ_EMPTY(&queue->request_list) && + (ret != ETIMEDOUT)) { + /* wait for cond to be signalled or broadcast for 1000s */ + ret = qemu_cond_timedwait((&queue->cond), + &(queue->lock), 10*100000); + } + + assert(queue->idle_threads != 0); + if (QTAILQ_EMPTY(&queue->request_list)) { + if (queue->cur_threads > queue->min_threads) { + /* We retain the minimum number of threads */ + break; + } + } else { + work = QTAILQ_FIRST(&queue->request_list); + QTAILQ_REMOVE(&queue->request_list, work, node); + + queue->idle_threads--; + qemu_mutex_unlock(&queue->lock); + + /* execute the work function */ + work->func(work); + + qemu_mutex_lock(&queue->lock); + queue->idle_threads++; + } + } + + queue->idle_threads--; + queue->cur_threads--; + qemu_mutex_unlock(&queue->lock); + + return NULL; +} + +static void spawn_threadlet(ThreadletQueue *queue) +{ + QemuThread thread; + + queue->cur_threads++; + queue->idle_threads++; + + qemu_thread_create(&thread, threadlet_worker, queue); +} + +/** + * submit_work: Submit to the global queue a new task to be executed + * asynchronously. + * @work: Contains information about the task that needs to be submitted. + */ +void submit_work(ThreadletWork *work) +{ + if (!globalqueue_init) { + globalqueue.cur_threads = 0; + globalqueue.idle_threads = 0; + globalqueue.max_threads = MAX_GLOBAL_THREADS; + globalqueue.min_threads = MIN_GLOBAL_THREADS; + QTAILQ_INIT(&globalqueue.request_list); + qemu_mutex_init(&globalqueue.lock); + qemu_cond_init(&globalqueue.cond); + + globalqueue_init = 1; + } + + qemu_mutex_lock(&globalqueue.lock); + if (globalqueue.idle_threads == 0 && + globalqueue.cur_threads < globalqueue.max_threads) { + spawn_threadlet(&globalqueue); + } else { + qemu_cond_signal(&globalqueue.cond); + } + QTAILQ_INSERT_TAIL(&globalqueue.request_list, work, node); + qemu_mutex_unlock(&globalqueue.lock); +} + +/** + * dequeue_work: Cancel a task queued on the global queue. + * @work: Contains the information of the task that needs to be cancelled. + */ +int dequeue_work(ThreadletWork *work) +{ + qemu_mutex_lock(&globalqueue.lock); + QTAILQ_REMOVE(&globalqueue.request_list, work, node); + qemu_mutex_unlock(&globalqueue.lock); + + return 0; +} diff --git a/qemu-threadlets.h b/qemu-threadlets.h new file mode 100644 index 0000000..03bb86b --- /dev/null +++ b/qemu-threadlets.h @@ -0,0 +1,43 @@ +/* + * Threadlet support for offloading tasks to be executed asynchronously + * + * Copyright IBM, Corp. 2008 + * Copyright IBM, Corp. 2010 + * + * Authors: + * Anthony Liguori <aligu...@us.ibm.com> + * Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> + * Gautham R Shenoy <gautham.she...@gmail.com> + * + * This work is licensed under the terms of the GNU GPL, version 2. See + * the COPYING file in the top-level directory. + */ + +#ifndef QEMU_ASYNC_WORK_H +#define QEMU_ASYNC_WORK_H + +#include "qemu-queue.h" +#include "qemu-common.h" +#include "qemu-thread.h" + +typedef struct ThreadletQueue +{ + QemuMutex lock; + QemuCond cond; + int max_threads; + int min_threads; + int cur_threads; + int idle_threads; + QTAILQ_HEAD(, ThreadletWork) request_list; +} ThreadletQueue; + +typedef struct ThreadletWork +{ + QTAILQ_ENTRY(ThreadletWork) node; + void (*func)(struct ThreadletWork *work); +} ThreadletWork; + +void submit_work(ThreadletWork *work); +int dequeue_work(ThreadletWork *work); +void threadlet_init(void); +#endif diff --git a/vl.c b/vl.c index aba805f..7b9a425 100644 --- a/vl.c +++ b/vl.c @@ -148,7 +148,7 @@ int main(int argc, char **argv) #include "qemu-config.h" #include "qemu-objects.h" #include "qemu-options.h" -#include "qemu-thread.h" +#include "qemu-threadlets.h" #ifdef CONFIG_VIRTFS #include "fsdev/qemu-fsdev.h" #endif