It can be used to implement, for example, threaded glCompileShader and glLinkProgram.
v2: allow tasks to "complete" other tasks Signed-off-by: Chia-I Wu <o...@lunarg.com> --- src/glsl/Makefile.am | 12 +- src/glsl/Makefile.sources | 3 +- src/glsl/tests/threadpool_test.cpp | 137 +++++++++++ src/glsl/threadpool.c | 476 +++++++++++++++++++++++++++++++++++++ src/glsl/threadpool.h | 67 ++++++ 5 files changed, 693 insertions(+), 2 deletions(-) create mode 100644 src/glsl/tests/threadpool_test.cpp create mode 100644 src/glsl/threadpool.c create mode 100644 src/glsl/threadpool.h diff --git a/src/glsl/Makefile.am b/src/glsl/Makefile.am index 00261fd..3d07af3 100644 --- a/src/glsl/Makefile.am +++ b/src/glsl/Makefile.am @@ -35,6 +35,7 @@ TESTS = glcpp/tests/glcpp-test \ tests/general-ir-test \ tests/optimization-test \ tests/ralloc-test \ + tests/threadpool-test \ tests/sampler-types-test \ tests/uniform-initializer-test @@ -48,6 +49,7 @@ check_PROGRAMS = \ glsl_test \ tests/general-ir-test \ tests/ralloc-test \ + tests/threadpool-test \ tests/sampler-types-test \ tests/uniform-initializer-test @@ -95,6 +97,14 @@ tests_ralloc_test_LDADD = \ $(top_builddir)/src/gtest/libgtest.la \ $(PTHREAD_LIBS) +tests_threadpool_test_SOURCES = \ + tests/threadpool_test.cpp \ + $(top_builddir)/src/glsl/threadpool.c +tests_threadpool_test_CFLAGS = $(PTHREAD_CFLAGS) +tests_threadpool_test_LDADD = \ + $(top_builddir)/src/gtest/libgtest.la \ + $(PTHREAD_LIBS) + tests_sampler_types_test_SOURCES = \ $(top_srcdir)/src/mesa/program/prog_hash_table.c\ $(top_srcdir)/src/mesa/program/symbol_table.c \ @@ -120,7 +130,7 @@ glcpp_glcpp_LDADD = \ libglcpp.la \ -lm -libglsl_la_LIBADD = libglcpp.la +libglsl_la_LIBADD = libglcpp.la $(PTHREAD_LIBS) libglsl_la_SOURCES = \ glsl_lexer.cpp \ glsl_parser.cpp \ diff --git a/src/glsl/Makefile.sources b/src/glsl/Makefile.sources index 6fc94d6..bab2358 100644 --- a/src/glsl/Makefile.sources +++ b/src/glsl/Makefile.sources @@ -103,7 +103,8 @@ LIBGLSL_FILES = \ $(GLSL_SRCDIR)/opt_tree_grafting.cpp \ $(GLSL_SRCDIR)/opt_vectorize.cpp \ $(GLSL_SRCDIR)/s_expression.cpp \ - $(GLSL_SRCDIR)/strtod.cpp + $(GLSL_SRCDIR)/strtod.cpp \ + $(GLSL_SRCDIR)/threadpool.c # glsl_compiler diff --git a/src/glsl/tests/threadpool_test.cpp b/src/glsl/tests/threadpool_test.cpp new file mode 100644 index 0000000..63f55c5 --- /dev/null +++ b/src/glsl/tests/threadpool_test.cpp @@ -0,0 +1,137 @@ +/* + * Copyright © 2014 LunarG, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice (including the next + * paragraph) shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER + * DEALINGS IN THE SOFTWARE. + */ +#include <gtest/gtest.h> +#include <string.h> +#include <stdbool.h> +#include <stdlib.h> +#include <time.h> +#include <unistd.h> +#include "c11/threads.h" + +#include "threadpool.h" + +#define NUM_THREADS 10 +#define OPS_PER_THREAD 100 +#define MAX_TASKS 10 + +static void +race_cb(void *data) +{ + usleep(1000 * 5); +} + +static int +race_random_op(void *data) +{ + struct _mesa_threadpool *pool = (struct _mesa_threadpool *) data; + struct _mesa_threadpool_task *tasks[MAX_TASKS]; + int num_tasks = 0; + int num_ops = 0; + int i; + + while (num_ops < OPS_PER_THREAD) { + int op = (random() % 1000); + + if (op < 480) { /* 48% */ + if (num_tasks < MAX_TASKS) { + tasks[num_tasks++] = + _mesa_threadpool_queue_task(pool, race_cb, NULL); + } + } + else if (op < 980) { /* 50% */ + if (num_tasks) + _mesa_threadpool_complete_task(pool, tasks[--num_tasks]); + } + else if (op < 995) { /* 1.5% */ + for (i = 0; i < num_tasks; i++) + _mesa_threadpool_complete_task(pool, tasks[i]); + num_tasks = 0; + } + else { /* 0.5% */ + _mesa_threadpool_join(pool, (op < 998)); + } + + num_ops++; + } + + for (i = 0; i < num_tasks; i++) + _mesa_threadpool_complete_task(pool, tasks[i]); + + _mesa_threadpool_unref(pool); + + return 0; +} + +/** + * \name Thread safety + */ +/*@{*/ +TEST(threadpool_test, race) +{ + struct _mesa_threadpool *pool; + thrd_t threads[NUM_THREADS]; + int i; + + srandom(time(NULL)); + pool = _mesa_threadpool_create(4); + for (i = 0; i < NUM_THREADS; i++) { + thrd_create(&threads[i], race_random_op, + (void *) _mesa_threadpool_ref(pool)); + } + _mesa_threadpool_unref(pool); + + for (i = 0; i < NUM_THREADS; i++) + thrd_join(threads[i], NULL); + + /* this is not really a unit test */ + EXPECT_TRUE(true); +} + +static void +basic_cb(void *data) +{ + int *val = (int *) data; + + usleep(1000 * 5); + *val = 1; +} + +TEST(threadpool_test, basic) +{ + struct _mesa_threadpool *pool; + struct _mesa_threadpool_task *tasks[2]; + int vals[2]; + + pool = _mesa_threadpool_create(2); + + vals[0] = vals[1] = 0; + tasks[0] = _mesa_threadpool_queue_task(pool, basic_cb, (void *) &vals[0]); + tasks[1] = _mesa_threadpool_queue_task(pool, basic_cb, (void *) &vals[1]); + _mesa_threadpool_complete_task(pool, tasks[0]); + _mesa_threadpool_complete_task(pool, tasks[1]); + EXPECT_TRUE(vals[0] == 1 && vals[1] == 1); + + _mesa_threadpool_unref(pool); +} + +/*@}*/ diff --git a/src/glsl/threadpool.c b/src/glsl/threadpool.c new file mode 100644 index 0000000..c069fd3 --- /dev/null +++ b/src/glsl/threadpool.c @@ -0,0 +1,476 @@ +/* + * Mesa 3-D graphics library + * + * Copyright (C) 2014 LunarG, Inc. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#include <stdio.h> +#include <stdbool.h> +#include "c11/threads.h" +#include "main/compiler.h" +#include "main/simple_list.h" +#include "threadpool.h" + +enum _mesa_threadpool_control { + MESA_THREADPOOL_NORMAL, /* threads wait when there is no task */ + MESA_THREADPOOL_QUIT, /* threads quit when there is no task */ + MESA_THREADPOOL_QUIT_NOW /* threads quit as soon as possible */ +}; + +enum _mesa_threadpool_task_state { + MESA_THREADPOOL_TASK_PENDING, /* task is on the pending list */ + MESA_THREADPOOL_TASK_ACTIVE, /* task is being worked on */ + MESA_THREADPOOL_TASK_COMPLETED, /* task has been completed */ + MESA_THREADPOOL_TASK_CANCELLED /* task is cancelled */ +}; + +struct _mesa_threadpool_task { + /* these are protected by the pool's mutex */ + struct simple_node link; /* must be the first */ + enum _mesa_threadpool_task_state state; + cnd_t completed; + + void (*func)(void *); + void *data; +}; + +struct _mesa_threadpool { + mtx_t mutex; + int refcnt; + + enum _mesa_threadpool_control thread_control; + thrd_t *threads; + int num_threads, max_threads; + int idle_threads; /* number of threads that are idle */ + cnd_t thread_wakeup; + cnd_t thread_joined; + + struct simple_node pending_tasks; + int num_pending_tasks; + int num_tasks; +}; + +static struct _mesa_threadpool_task * +task_create(void) +{ + struct _mesa_threadpool_task *task; + + task = malloc(sizeof(*task)); + if (!task) + return NULL; + + if (cnd_init(&task->completed)) { + free(task); + return NULL; + } + + task->state = MESA_THREADPOOL_TASK_PENDING; + + return task; +} + +static void +task_destroy(struct _mesa_threadpool_task *task) +{ + cnd_destroy(&task->completed); + free(task); +} + +static void +pool_exec_task(struct _mesa_threadpool *pool, + struct _mesa_threadpool_task *task) +{ + assert(task->state == MESA_THREADPOOL_TASK_PENDING); + + remove_from_list(&task->link); + pool->num_pending_tasks--; + + task->state = MESA_THREADPOOL_TASK_ACTIVE; + + /* do the work! */ + mtx_unlock(&pool->mutex); + task->func(task->data); + mtx_lock(&pool->mutex); + + task->state = MESA_THREADPOOL_TASK_COMPLETED; +} + +static int +_mesa_threadpool_worker(void *arg) +{ + struct _mesa_threadpool *pool = (struct _mesa_threadpool *) arg; + + mtx_lock(&pool->mutex); + + while (true) { + struct _mesa_threadpool_task *task; + + /* wait until there are tasks */ + while (is_empty_list(&pool->pending_tasks) && + pool->thread_control == MESA_THREADPOOL_NORMAL) { + pool->idle_threads++; + cnd_wait(&pool->thread_wakeup, &pool->mutex); + pool->idle_threads--; + } + + if (pool->thread_control != MESA_THREADPOOL_NORMAL) { + if (pool->thread_control == MESA_THREADPOOL_QUIT_NOW || + is_empty_list(&pool->pending_tasks)) + break; + } + + assert(!is_empty_list(&pool->pending_tasks)); + task = (struct _mesa_threadpool_task *) + first_elem(&pool->pending_tasks); + + pool_exec_task(pool, task); + cnd_signal(&task->completed); + } + + mtx_unlock(&pool->mutex); + + return 0; +} + +/** + * Queue a new task. + */ +struct _mesa_threadpool_task * +_mesa_threadpool_queue_task(struct _mesa_threadpool *pool, + void (*func)(void *), void *data) +{ + struct _mesa_threadpool_task *task; + + task = task_create(); + if (!task) + return NULL; + + task->func = func; + task->data = data; + + mtx_lock(&pool->mutex); + + /* someone is joining with the threads */ + while (unlikely(pool->thread_control != MESA_THREADPOOL_NORMAL)) + cnd_wait(&pool->thread_joined, &pool->mutex); + + /* spawn threads as needed */ + if (pool->idle_threads <= pool->num_pending_tasks && + pool->num_threads < pool->max_threads) { + int err; + + err = thrd_create(&pool->threads[pool->num_threads], + _mesa_threadpool_worker, (void *) pool); + if (!err) + pool->num_threads++; + + if (!pool->num_threads) { + mtx_unlock(&pool->mutex); + task_destroy(task); + return NULL; + } + } + + insert_at_tail(&pool->pending_tasks, &task->link); + pool->num_tasks++; + pool->num_pending_tasks++; + cnd_signal(&pool->thread_wakeup); + + mtx_unlock(&pool->mutex); + + return task; +} + +/** + * Wait and destroy the tasks. + */ +static bool +pool_wait_tasks(struct _mesa_threadpool *pool, + struct _mesa_threadpool_task **tasks, + int num_tasks) +{ + bool all_completed = true; + int i; + + for (i = 0; i < num_tasks; i++) { + struct _mesa_threadpool_task *task = tasks[i]; + + while (task->state != MESA_THREADPOOL_TASK_COMPLETED && + task->state != MESA_THREADPOOL_TASK_CANCELLED) + cnd_wait(&task->completed, &pool->mutex); + + if (task->state != MESA_THREADPOOL_TASK_COMPLETED) + all_completed = false; + + task_destroy(task); + } + + pool->num_tasks -= num_tasks; + + return all_completed; +} + +/** + * Wait for \p tasks to complete, and destroy them. If some of \p tasks + * cannot not be completed, return false. + * + * This function can be called from within the worker threads. + */ +bool +_mesa_threadpool_complete_tasks(struct _mesa_threadpool *pool, + struct _mesa_threadpool_task **tasks, + int num_tasks) +{ + bool prioritized = false, completed; + int i; + + mtx_lock(&pool->mutex); + + /* we need to do something about tasks that are pending */ + for (i = 0; i < num_tasks; i++) { + struct _mesa_threadpool_task *task = tasks[i]; + + if (task->state != MESA_THREADPOOL_TASK_PENDING) + continue; + + /* move them to the head so that they are executed next */ + if (!prioritized) { + int j; + + for (j = i + 1; j < num_tasks; j++) { + if (task->state == MESA_THREADPOOL_TASK_PENDING) + move_to_head(&pool->pending_tasks, &task->link); + } + prioritized = true; + } + + /* + * Execute right away for we would have to wait for the worker threads + * otherwise, which is no faster. More importantly, when this is called + * from within worker threads, there may be no idle thread available to + * execute them. + */ + pool_exec_task(pool, task); + } + + completed = pool_wait_tasks(pool, tasks, num_tasks); + + mtx_unlock(&pool->mutex); + + return completed; +} + +/** + * This is equivalent to calling \p _mesa_threadpool_complete_tasks with one + * task. + */ +bool +_mesa_threadpool_complete_task(struct _mesa_threadpool *pool, + struct _mesa_threadpool_task *task) +{ + bool completed; + + mtx_lock(&pool->mutex); + + if (task->state == MESA_THREADPOOL_TASK_PENDING) + pool_exec_task(pool, task); + + completed = pool_wait_tasks(pool, &task, 1); + + mtx_unlock(&pool->mutex); + + return completed; +} + +static void +pool_cancel_pending_tasks(struct _mesa_threadpool *pool) +{ + struct simple_node *node, *temp; + + if (is_empty_list(&pool->pending_tasks)) + return; + + foreach_s(node, temp, &pool->pending_tasks) { + struct _mesa_threadpool_task *task = + (struct _mesa_threadpool_task *) node; + + remove_from_list(&task->link); + task->state = MESA_THREADPOOL_TASK_CANCELLED; + + /* in case some thread is already waiting */ + cnd_signal(&task->completed); + } + + pool->num_pending_tasks = 0; +} + +static void +pool_join_threads(struct _mesa_threadpool *pool, bool graceful) +{ + int joined_threads = 0; + + if (!pool->num_threads) + return; + + pool->thread_control = (graceful) ? + MESA_THREADPOOL_QUIT : MESA_THREADPOOL_QUIT_NOW; + + while (joined_threads < pool->num_threads) { + int i = joined_threads, num_threads = pool->num_threads; + + cnd_broadcast(&pool->thread_wakeup); + mtx_unlock(&pool->mutex); + while (i < num_threads) + thrd_join(pool->threads[i++], NULL); + mtx_lock(&pool->mutex); + + joined_threads = num_threads; + } + + pool->thread_control = MESA_THREADPOOL_NORMAL; + pool->num_threads = 0; + assert(!pool->idle_threads); +} + +/** + * Join with all pool threads. When \p graceful is true, wait for the pending + * tasks to be completed. + */ +void +_mesa_threadpool_join(struct _mesa_threadpool *pool, bool graceful) +{ + mtx_lock(&pool->mutex); + + /* someone is already joining with the threads */ + while (unlikely(pool->thread_control != MESA_THREADPOOL_NORMAL)) + cnd_wait(&pool->thread_joined, &pool->mutex); + + if (pool->num_threads) { + pool_join_threads(pool, graceful); + /* wake up whoever is waiting */ + cnd_broadcast(&pool->thread_joined); + } + + if (!graceful) + pool_cancel_pending_tasks(pool); + + assert(pool->num_threads == 0); + assert(is_empty_list(&pool->pending_tasks) && !pool->num_pending_tasks); + + mtx_unlock(&pool->mutex); +} + +/** + * Decrease the reference count. Destroy \p pool when the reference count + * reaches zero. + */ +void +_mesa_threadpool_unref(struct _mesa_threadpool *pool) +{ + bool destroy = false; + + mtx_lock(&pool->mutex); + pool->refcnt--; + destroy = (pool->refcnt == 0); + mtx_unlock(&pool->mutex); + + if (destroy) { + _mesa_threadpool_join(pool, false); + + if (pool->num_tasks) { + fprintf(stderr, "thread pool destroyed with %d tasks\n", + pool->num_tasks); + } + + free(pool->threads); + cnd_destroy(&pool->thread_joined); + cnd_destroy(&pool->thread_wakeup); + mtx_destroy(&pool->mutex); + free(pool); + } +} + +/** + * Increase the reference count. + */ +struct _mesa_threadpool * +_mesa_threadpool_ref(struct _mesa_threadpool *pool) +{ + mtx_lock(&pool->mutex); + pool->refcnt++; + mtx_unlock(&pool->mutex); + + return pool; +} + +/** + * Create a thread pool. As threads are spawned as needed, this is + * inexpensive. + */ +struct _mesa_threadpool * +_mesa_threadpool_create(int max_threads) +{ + struct _mesa_threadpool *pool; + + if (max_threads < 1) + return NULL; + + pool = calloc(1, sizeof(*pool)); + if (!pool) + return NULL; + + if (mtx_init(&pool->mutex, mtx_plain)) { + free(pool); + return NULL; + } + + pool->refcnt = 1; + + if (cnd_init(&pool->thread_wakeup)) { + mtx_destroy(&pool->mutex); + free(pool); + return NULL; + } + + if (cnd_init(&pool->thread_joined)) { + cnd_destroy(&pool->thread_wakeup); + mtx_destroy(&pool->mutex); + free(pool); + return NULL; + } + + pool->thread_control = MESA_THREADPOOL_NORMAL; + + pool->threads = malloc(sizeof(pool->threads[0]) * max_threads); + if (!pool->threads) { + cnd_destroy(&pool->thread_joined); + cnd_destroy(&pool->thread_wakeup); + mtx_destroy(&pool->mutex); + free(pool); + return NULL; + } + + pool->max_threads = max_threads; + + make_empty_list(&pool->pending_tasks); + + return pool; +} diff --git a/src/glsl/threadpool.h b/src/glsl/threadpool.h new file mode 100644 index 0000000..48e4a47 --- /dev/null +++ b/src/glsl/threadpool.h @@ -0,0 +1,67 @@ +/* + * Mesa 3-D graphics library + * + * Copyright (C) 2014 LunarG, Inc. All Rights Reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the "Software"), + * to deal in the Software without restriction, including without limitation + * the rights to use, copy, modify, merge, publish, distribute, sublicense, + * and/or sell copies of the Software, and to permit persons to whom the + * Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + + +#ifndef THREADPOOL_H +#define THREADPOOL_H + +#include <stdbool.h> + +#ifdef __cplusplus +extern "C" { +#endif + +struct _mesa_threadpool; +struct _mesa_threadpool_task; + +struct _mesa_threadpool * +_mesa_threadpool_create(int max_threads); + +struct _mesa_threadpool * +_mesa_threadpool_ref(struct _mesa_threadpool *pool); + +void +_mesa_threadpool_unref(struct _mesa_threadpool *pool); + +void +_mesa_threadpool_join(struct _mesa_threadpool *pool, bool graceful); + +struct _mesa_threadpool_task * +_mesa_threadpool_queue_task(struct _mesa_threadpool *pool, + void (*func)(void *), void *data); + +bool +_mesa_threadpool_complete_tasks(struct _mesa_threadpool *pool, + struct _mesa_threadpool_task **tasks, + int num_tasks); + +bool +_mesa_threadpool_complete_task(struct _mesa_threadpool *pool, + struct _mesa_threadpool_task *task); + +#ifdef __cplusplus +} +#endif + +#endif /* THREADPOOL_H */ -- 2.0.0.rc2 _______________________________________________ mesa-dev mailing list mesa-dev@lists.freedesktop.org http://lists.freedesktop.org/mailman/listinfo/mesa-dev