On Thu, Jun 3, 2010 at 10:56 AM, Gautham R Shenoy <e...@in.ibm.com> wrote: > From: Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > > This patch creates a generic asynchronous-task-offloading infrastructure. It's > extracted out of the threading framework that is being used by paio. > > The reason for extracting out this generic infrastructure of the > posix-aio-compat.c is so that other subsystems, such as virtio-9p could make > use > of it for offloading tasks that could block. > > [...@in.ibm.com: work_item_pool, async_work_init, async_work_release, > async_cancel_work] > > Signed-off-by: Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > Signed-off-by: Gautham R Shenoy <e...@in.ibm.com> > --- > Makefile.objs | 3 + > async-work.c | 136 > +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ > async-work.h | 85 ++++++++++++++++++++++++++++++++++++ > 3 files changed, 223 insertions(+), 1 deletions(-) > create mode 100644 async-work.c > create mode 100644 async-work.h > > diff --git a/Makefile.objs b/Makefile.objs > index ecdd53e..fd5ea4d 100644 > --- a/Makefile.objs > +++ b/Makefile.objs > @@ -9,6 +9,8 @@ qobject-obj-y += qerror.o > > block-obj-y = cutils.o cache-utils.o qemu-malloc.o qemu-option.o module.o > block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o > +block-obj-y += qemu-thread.o > +block-obj-y += async-work.o > block-obj-$(CONFIG_POSIX) += posix-aio-compat.o > block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o > > @@ -108,7 +110,6 @@ common-obj-y += iov.o > common-obj-$(CONFIG_VNC_TLS) += vnc-tls.o vnc-auth-vencrypt.o > common-obj-$(CONFIG_VNC_SASL) += vnc-auth-sasl.o > common-obj-$(CONFIG_COCOA) += cocoa.o > -common-obj-$(CONFIG_IOTHREAD) += qemu-thread.o > common-obj-y += notify.o event_notifier.o > common-obj-y += qemu-timer.o > > diff --git a/async-work.c b/async-work.c > new file mode 100644 > index 0000000..0675732 > --- /dev/null > +++ b/async-work.c > @@ -0,0 +1,136 @@ > +/* > + * Async work support > + * > + * Copyright IBM, Corp. 2010 > + * > + * Authors: > + * Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + * > + */ > +#include <stdio.h> > +#include <errno.h> > +#include <string.h> > +#include <stdlib.h> > +#include <signal.h> > +#include "async-work.h" > +#include "osdep.h" > + > +static void async_abort(int err, const char *what) > +{ > + fprintf(stderr, "%s failed: %s\n", what, strerror(err)); > + abort(); > +} > + > +static void *async_worker_thread(void *data) > +{ > + struct async_queue *queue = data; > + > + while (1) { > + struct work_item *work; > + int ret = 0; > + qemu_mutex_lock(&(queue->lock)); > + > + while (QTAILQ_EMPTY(&(queue->request_list)) && > + (ret != ETIMEDOUT)) { > + ret = qemu_cond_timedwait(&(queue->cond), > + &(queue->lock), 10*100000); > + } > + > + if (QTAILQ_EMPTY(&(queue->request_list))) > + goto check_exit; > + > + work = QTAILQ_FIRST(&(queue->request_list)); > + QTAILQ_REMOVE(&(queue->request_list), work, node); > + queue->idle_threads--; > + qemu_mutex_unlock(&(queue->lock)); > + > + /* execute the work function */ > + work->func(work);
Here the queue is empty, but there is a job running. In the VNC server I need to be able to "join" jobs (before resize, deconnection, etc..). What do you thing about adding something like qemu_async_join(queue, work) (if work is null, join all job) ? > + async_work_release(queue, work); > + > + qemu_mutex_lock(&(queue->lock)); > + queue->idle_threads++; > + > +check_exit: > + if ((queue->idle_threads > 0) && > + (queue->cur_threads > queue->min_threads)) { > + /* we retain minimum number of threads */ > + break; > + } > + qemu_mutex_unlock(&(queue->lock)); > + } > + > + queue->idle_threads--; > + queue->cur_threads--; > + qemu_mutex_unlock(&(queue->lock)); > + > + return NULL; > +} > + > +static void spawn_async_thread(struct async_queue *queue) > +{ > + QemuThreadAttr attr; > + QemuThread thread; > + sigset_t set, oldset; > + > + queue->cur_threads++; > + queue->idle_threads++; > + > + qemu_thread_attr_init(&attr); > + > + /* create a detached thread so that we don't need to wait on it */ > + qemu_thread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); > + > + /* block all signals */ > + if (sigfillset(&set)) { > + async_abort(errno, "sigfillset"); > + } > + > + if (sigprocmask(SIG_SETMASK, &set, &oldset)) { > + async_abort(errno, "sigprocmask"); > + } > + > + qemu_thread_create_attr(&thread, &attr, async_worker_thread, queue); > + > + if (sigprocmask(SIG_SETMASK, &oldset, NULL)) { > + async_abort(errno, "sigprocmask restore"); > + } Using PTHREAD_CREATE_DETACHED and signal stuff here doesn't looks really portable. Can't we abstract that into qemu-thread (then, we just need port qemu-thread to windows) ? > +} > + > +void qemu_async_submit(struct async_queue *queue, struct work_item *work) > +{ > + qemu_mutex_lock(&(queue->lock)); > + if (queue->idle_threads == 0 && queue->cur_threads < queue->max_threads) > { > + spawn_async_thread(queue); > + } > + QTAILQ_INSERT_TAIL(&(queue->request_list), work, node); > + qemu_mutex_unlock(&(queue->lock)); > + qemu_cond_signal(&(queue->cond)); > +} > + > +int qemu_async_cancel_work(struct async_queue *queue, struct work_item *work) > +{ > + struct work_item *ret_work; > + int found = 0; > + > + qemu_mutex_lock(&(queue->lock)); > + QTAILQ_FOREACH(ret_work, &(queue->request_list), node) { > + if (ret_work == work) { > + QTAILQ_REMOVE(&(queue->request_list), ret_work, node); > + found = 1; > + break; > + } > + } > + qemu_mutex_unlock(&(queue->lock)); > + > + if (found) { > + async_work_release(queue, work); > + return 0; > + } > + > + return 1; > +} > + > diff --git a/async-work.h b/async-work.h > new file mode 100644 > index 0000000..8389f56 > --- /dev/null > +++ b/async-work.h > @@ -0,0 +1,85 @@ > +/* > + * Async work support > + * > + * Copyright IBM, Corp. 2010 > + * > + * Authors: > + * Aneesh Kumar K.V <aneesh.ku...@linux.vnet.ibm.com> > + * > + * This work is licensed under the terms of the GNU GPL, version 2. See > + * the COPYING file in the top-level directory. > + * > + */ > +#ifndef QEMU_ASYNC_WORK_H > +#define QEMU_ASYNC_WORK_H > + > +#include "qemu-queue.h" > +#include "qemu-common.h" > +#include "qemu-thread.h" > + > +struct async_queue > +{ > + QemuMutex lock; > + QemuCond cond; > + int max_threads; > + int min_threads; > + int cur_threads; > + int idle_threads; > + QTAILQ_HEAD(, work_item) request_list; > + QTAILQ_HEAD(, work_item) work_item_pool; > +}; > + > +struct work_item > +{ > + QTAILQ_ENTRY(work_item) node; > + void (*func)(struct work_item *work); > + void *private; > +}; > + > +static inline void async_queue_init(struct async_queue *queue, > + int max_threads, int min_threads) > +{ > + queue->cur_threads = 0; > + queue->idle_threads = 0; > + queue->max_threads = max_threads; > + queue->min_threads = min_threads; > + QTAILQ_INIT(&(queue->request_list)); > + QTAILQ_INIT(&(queue->work_item_pool)); > + qemu_mutex_init(&(queue->lock)); > + qemu_cond_init(&(queue->cond)); > +} > + > +static inline struct work_item *async_work_init(struct async_queue *queue, > + void (*func)(struct work_item *), > + void *data) > +{ > + struct work_item *work; > + qemu_mutex_lock(&(queue->lock)); > + if (QTAILQ_EMPTY(&(queue->work_item_pool))) { > + work = qemu_mallocz(sizeof(*work)); > + } else { > + work = QTAILQ_FIRST(&(queue->work_item_pool)); > + QTAILQ_REMOVE(&(queue->work_item_pool), work, node); > + } > + > + work->func = func; > + work->private = data; > + qemu_mutex_unlock(&(queue->lock)); > + > + return work; > +} > + > +static inline void async_work_release(struct async_queue *queue, > + struct work_item *work) > +{ > + qemu_mutex_lock(&(queue->lock)); > + QTAILQ_INSERT_TAIL(&(queue->work_item_pool), work, node); > + qemu_mutex_unlock(&(queue->lock)); > +} > + > +extern void qemu_async_submit(struct async_queue *queue, > + struct work_item *work); > + > +extern int qemu_async_cancel_work(struct async_queue *queue, > + struct work_item *work); > +#endif > > -- Corentin Chary http://xf.iksaif.net