On 04/28/2011 09:40 PM, Sasha Levin wrote:
> This patch adds a generic pool to create a common interface for working with
> threads within the kvm tool.
> Main idea here is using this threadpool for all I/O threads instead of having
> every I/O module write it's own thread code.
>
> The process of working with the thread pool is supposed to be very simple.
> During initialization, Each module which is interested in working with the
> threadpool will call threadpool__add_jobtype with the callback function and a
> void* parameter. For example, virtio modules will register every virt_queue
> as a new job type.
> During operation, When theres work to do for a specific job, the module will
> signal it to the queue and would expect the callback to be called with proper
> parameters. It is assured that the callback will be called once for every
> signal action and each callback will be called only once at a time (i.e.
> callback functions themselves don't need to handle threading).
>
> Signed-off-by: Sasha Levin <[email protected]>
> ---
> tools/kvm/Makefile | 1 +
> tools/kvm/include/kvm/threadpool.h | 16 ++++
> tools/kvm/kvm-run.c | 5 +
> tools/kvm/threadpool.c | 171
> ++++++++++++++++++++++++++++++++++++
> 4 files changed, 193 insertions(+), 0 deletions(-)
> create mode 100644 tools/kvm/include/kvm/threadpool.h
> create mode 100644 tools/kvm/threadpool.c
>
> diff --git a/tools/kvm/Makefile b/tools/kvm/Makefile
> index 1b0c76e..fbce14d 100644
> --- a/tools/kvm/Makefile
> +++ b/tools/kvm/Makefile
> @@ -36,6 +36,7 @@ OBJS += kvm-cmd.o
> OBJS += kvm-run.o
> OBJS += qcow.o
> OBJS += mptable.o
> +OBJS += threadpool.o
>
> DEPS := $(patsubst %.o,%.d,$(OBJS))
>
> diff --git a/tools/kvm/include/kvm/threadpool.h
> b/tools/kvm/include/kvm/threadpool.h
> new file mode 100644
> index 0000000..25b5eb8
> --- /dev/null
> +++ b/tools/kvm/include/kvm/threadpool.h
> @@ -0,0 +1,16 @@
> +#ifndef KVM__THREADPOOL_H
> +#define KVM__THREADPOOL_H
> +
> +#include <stdint.h>
> +
> +struct kvm;
> +
> +typedef void (*kvm_thread_callback_fn_t)(struct kvm *kvm, void *data);
> +
> +int thread_pool__init(unsigned long thread_count);
> +
> +void *thread_pool__add_jobtype(struct kvm *kvm, kvm_thread_callback_fn_t
> callback, void *data);
> +
> +void thread_pool__signal_work(void *job);
> +
> +#endif
> diff --git a/tools/kvm/kvm-run.c b/tools/kvm/kvm-run.c
> index 071157a..97a17dd 100644
> --- a/tools/kvm/kvm-run.c
> +++ b/tools/kvm/kvm-run.c
> @@ -24,6 +24,7 @@
> #include <kvm/pci.h>
> #include <kvm/term.h>
> #include <kvm/ioport.h>
> +#include <kvm/threadpool.h>
>
> /* header files for gitish interface */
> #include <kvm/kvm-run.h>
> @@ -312,6 +313,7 @@ int kvm_cmd_run(int argc, const char **argv, const char
> *prefix)
> int i;
> struct virtio_net_parameters net_params;
> char *hi;
> + unsigned int nr_online_cpus;
>
> signal(SIGALRM, handle_sigalrm);
> signal(SIGQUIT, handle_sigquit);
> @@ -457,6 +459,9 @@ int kvm_cmd_run(int argc, const char **argv, const char
> *prefix)
>
> kvm__init_ram(kvm);
>
> + nr_online_cpus = sysconf(_SC_NPROCESSORS_ONLN);
> + thread_pool__init(nr_online_cpus);
We may benefit from more threads than the number of hardware thread we
have. Currently, virtio_console consumes two, virio_net consumes two,
and virtio_blk consumes one. Can we adjust the thread pool size when
devices register to use thread pool?
> +
> for (i = 0; i < nrcpus; i++) {
> if (pthread_create(&kvm_cpus[i]->thread, NULL, kvm_cpu_thread,
> kvm_cpus[i]) != 0)
> die("unable to create KVM VCPU thread");
> diff --git a/tools/kvm/threadpool.c b/tools/kvm/threadpool.c
> new file mode 100644
> index 0000000..e78db3a
> --- /dev/null
> +++ b/tools/kvm/threadpool.c
> @@ -0,0 +1,171 @@
> +#include "kvm/threadpool.h"
> +#include "kvm/mutex.h"
> +
> +#include <linux/kernel.h>
> +#include <linux/list.h>
> +#include <pthread.h>
> +#include <stdbool.h>
> +
> +struct thread_pool__job_info {
> + kvm_thread_callback_fn_t callback;
> + struct kvm *kvm;
> + void *data;
> +
> + int signalcount;
> + pthread_mutex_t mutex;
> +
> + struct list_head queue;
> +};
Does 'struct thread_pool__job' sound better?
> +static pthread_mutex_t job_mutex =
> PTHREAD_MUTEX_INITIALIZER;
> +static pthread_mutex_t thread_mutex = PTHREAD_MUTEX_INITIALIZER;
> +static pthread_cond_t job_cond =
> PTHREAD_COND_INITIALIZER;
These mutex and cond are global. As the number of thread/job grows,
there may be a lot of contention.
> +
> +static LIST_HEAD(head);
> +
> +static pthread_t *threads;
> +static long threadcount;
> +
> +static struct thread_pool__job_info *thread_pool__job_info_pop(void)
> +{
> + struct thread_pool__job_info *job;
> +
> + if (list_empty(&head))
> + return NULL;
> +
> + job = list_first_entry(&head, struct thread_pool__job_info, queue);
> + list_del(&job->queue);
> +
> + return job;
> +}
> +
> +static void thread_pool__job_info_push(struct thread_pool__job_info *job)
> +{
> + list_add_tail(&job->queue, &head);
> +}
> +
> +static struct thread_pool__job_info *thread_pool__job_info_pop_locked(void)
> +{
> + struct thread_pool__job_info *job;
> +
> + mutex_lock(&job_mutex);
> + job = thread_pool__job_info_pop();
> + mutex_unlock(&job_mutex);
> + return job;
> +}
> +
> +static void thread_pool__job_info_push_locked(struct thread_pool__job_info
> *job)
> +{
> + mutex_lock(&job_mutex);
> + thread_pool__job_info_push(job);
> + mutex_unlock(&job_mutex);
> +}
> +
> +static void thread_pool__handle_job(struct thread_pool__job_info *job)
> +{
> + while (job) {
> + job->callback(job->kvm, job->data);
> +
> + mutex_lock(&job->mutex);
> +
> + if (--job->signalcount > 0)
> + /* If the job was signaled again while we were working
> */
> + thread_pool__job_info_push_locked(job);
> +
> + mutex_unlock(&job->mutex);
> +
> + job = thread_pool__job_info_pop_locked();
> + }
> +}
> +
> +static void thread_pool__threadfunc_cleanup(void *param)
> +{
> + mutex_unlock(&job_mutex);
> +}
> +
> +static void *thread_pool__threadfunc(void *param)
> +{
> + pthread_cleanup_push(thread_pool__threadfunc_cleanup, NULL);
> +
> + for (;;) {
> + struct thread_pool__job_info *curjob;
> +
> + mutex_lock(&job_mutex);
> + pthread_cond_wait(&job_cond, &job_mutex);
> + curjob = thread_pool__job_info_pop();
> + mutex_unlock(&job_mutex);
> +
> + if (curjob)
> + thread_pool__handle_job(curjob);
> + }
> +
> + pthread_cleanup_pop(0);
> +
> + return NULL;
> +}
> +
> +static int thread_pool__addthread(void)
> +{
> + int res;
> + void *newthreads;
> +
> + mutex_lock(&thread_mutex);
> + newthreads = realloc(threads, (threadcount + 1) * sizeof(pthread_t));
> + if (newthreads == NULL) {
> + mutex_unlock(&thread_mutex);
> + return -1;
> + }
> +
> + threads = newthreads;
> +
> + res = pthread_create(threads + threadcount, NULL,
> +
> thread_pool__threadfunc, NULL);
> +
> + if (res == 0)
> + threadcount++;
> + mutex_unlock(&thread_mutex);
> +
> + return res;
> +}
> +
> +int thread_pool__init(unsigned long thread_count)
> +{
> + unsigned long i;
> +
> + for (i = 0 ; i < thread_count ; i++)
> + if (thread_pool__addthread() < 0)
> + return i;
> +
> + return i;
> +}
> +
> +void *thread_pool__add_jobtype(struct kvm *kvm,
> +
> kvm_thread_callback_fn_t callback,
> + void *data)
Is thread_pool__add_job() better?
> +{
> + struct thread_pool__job_info *job = calloc(1, sizeof(*job));
> +
> + *job = (struct thread_pool__job_info) {
> + .kvm = kvm,
> + .data = data,
> + .callback = callback,
> + .mutex = PTHREAD_MUTEX_INITIALIZER
> + };
> +
> + return job;
> +}
> +
> +void thread_pool__signal_work(void *job)
I think thread_pool__signal_job() or thread_pool__do_job()
would be more consistent.
Consumer of this API can simply use it with: thread_pool_{add,do}_job().
> +{
> + struct thread_pool__job_info *jobinfo = job;
> +
> + if (jobinfo == NULL)
> + return;
> +
> + mutex_lock(&jobinfo->mutex);
> + if (jobinfo->signalcount++ == 0)
> + thread_pool__job_info_push_locked(job);
> + mutex_unlock(&jobinfo->mutex);
> +
> + pthread_cond_signal(&job_cond);
> +}
--
Best Regards,
Asias He
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to [email protected]
More majordomo info at http://vger.kernel.org/majordomo-info.html