This adds functionality to do work in a parallel threaded
fashion while the boiler plate code for setting up threads
and tearing them down as well as queuing up tasks is hidden
behind the new API.

Signed-off-by: Stefan Beller <sbel...@google.com>
---
 run-command.c  |  39 +++++++-----
 run-command.h  |   3 +
 thread-utils.c | 192 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 thread-utils.h |  35 +++++++++++
 4 files changed, 253 insertions(+), 16 deletions(-)

diff --git a/run-command.c b/run-command.c
index 28e1d55..3d37f8c 100644
--- a/run-command.c
+++ b/run-command.c
@@ -610,10 +610,12 @@ static NORETURN void die_async(const char *err, va_list 
params)
 
        if (!pthread_equal(main_thread, pthread_self())) {
                struct async *async = pthread_getspecific(async_key);
-               if (async->proc_in >= 0)
-                       close(async->proc_in);
-               if (async->proc_out >= 0)
-                       close(async->proc_out);
+               if (async) {
+                       if (async->proc_in >= 0)
+                               close(async->proc_in);
+                       if (async->proc_out >= 0)
+                               close(async->proc_out);
+               }
                pthread_exit((void *)128);
        }
 
@@ -668,6 +670,22 @@ int git_atexit(void (*handler)(void))
 
 #endif
 
+void setup_main_thread(void)
+{
+       if (!main_thread_set) {
+               /*
+                * We assume that the first time that start_async is called
+                * it is from the main thread.
+                */
+               main_thread_set = 1;
+               main_thread = pthread_self();
+               pthread_key_create(&async_key, NULL);
+               pthread_key_create(&async_die_counter, NULL);
+               set_die_routine(die_async);
+               set_die_is_recursing_routine(async_die_is_recursing);
+       }
+}
+
 int start_async(struct async *async)
 {
        int need_in, need_out;
@@ -740,18 +758,7 @@ int start_async(struct async *async)
        else if (async->out)
                close(async->out);
 #else
-       if (!main_thread_set) {
-               /*
-                * We assume that the first time that start_async is called
-                * it is from the main thread.
-                */
-               main_thread_set = 1;
-               main_thread = pthread_self();
-               pthread_key_create(&async_key, NULL);
-               pthread_key_create(&async_die_counter, NULL);
-               set_die_routine(die_async);
-               set_die_is_recursing_routine(async_die_is_recursing);
-       }
+       setup_main_thread();
 
        if (proc_in >= 0)
                set_cloexec(proc_in);
diff --git a/run-command.h b/run-command.h
index 5b4425a..176a5b2 100644
--- a/run-command.h
+++ b/run-command.h
@@ -119,4 +119,7 @@ struct async {
 int start_async(struct async *async);
 int finish_async(struct async *async);
 
+/* die gracefully from within threads */
+void setup_main_thread(void);
+
 #endif
diff --git a/thread-utils.c b/thread-utils.c
index a2135e0..30ccd79 100644
--- a/thread-utils.c
+++ b/thread-utils.c
@@ -1,5 +1,7 @@
 #include "cache.h"
 #include "thread-utils.h"
+#include "run-command.h"
+#include "git-compat-util.h"
 
 #if defined(hpux) || defined(__hpux) || defined(_hpux)
 #  include <sys/pstat.h>
@@ -75,3 +77,193 @@ int init_recursive_mutex(pthread_mutex_t *m)
        }
        return ret;
 }
+
+#ifndef NO_PTHREADS
+struct job_list {
+       int (*fct)(struct task_queue *tq, void *task);
+       void *task;
+       struct job_list *next;
+};
+
+struct task_queue {
+       pthread_mutex_t mutex;
+       pthread_cond_t cond_non_empty;
+
+       int queued_tasks;
+       struct job_list *first;
+       struct job_list *last;
+
+       pthread_t *threads;
+       unsigned max_threads;
+       unsigned max_tasks;
+
+       void (*finish_function)(struct task_queue *tq);
+       int early_return;
+};
+
+static void next_task(struct task_queue *tq,
+                     int (**fct)(struct task_queue *tq, void *task),
+                     void **task,
+                     int *early_return)
+{
+       struct job_list *job = NULL;
+
+       pthread_mutex_lock(&tq->mutex);
+       while (tq->queued_tasks == 0)
+               pthread_cond_wait(&tq->cond_non_empty, &tq->mutex);
+
+       tq->early_return |= *early_return;
+
+       if (!tq->early_return) {
+               job = tq->first;
+               tq->first = job->next;
+               if (!tq->first)
+                       tq->last = NULL;
+               tq->queued_tasks--;
+       }
+
+       pthread_mutex_unlock(&tq->mutex);
+
+       if (job) {
+               *fct = job->fct;
+               *task = job->task;
+       } else {
+               *fct = NULL;
+               *task = NULL;
+       }
+
+       free(job);
+}
+
+static void *dispatcher(void *args)
+{
+       void *task;
+       int (*fct)(struct task_queue *tq, void *task);
+       int early_return = 0;
+       struct task_queue *tq = args;
+
+       next_task(tq, &fct, &task, &early_return);
+       while (fct && !early_return) {
+               early_return = fct(tq, task);
+               next_task(tq, &fct, &task, &early_return);
+       }
+
+       if (tq->finish_function)
+               tq->finish_function(tq);
+
+       pthread_exit(0);
+}
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+       struct task_queue *tq = xmalloc(sizeof(*tq));
+
+       int i, ret;
+       if (!max_threads)
+               tq->max_threads = online_cpus();
+       else
+               tq->max_threads = max_threads;
+
+       pthread_mutex_init(&tq->mutex, NULL);
+       pthread_cond_init(&tq->cond_non_empty, NULL);
+
+       tq->threads = xmalloc(tq->max_threads * sizeof(pthread_t));
+
+       tq->queued_tasks = 0;
+       tq->first = NULL;
+       tq->last = NULL;
+
+       setup_main_thread();
+
+       for (i = 0; i < tq->max_threads; i++) {
+               ret = pthread_create(&tq->threads[i], 0, &dispatcher, tq);
+               if (ret)
+                       die("unable to create thread: %s", strerror(ret));
+       }
+
+       tq->early_return = 0;
+
+       return tq;
+}
+
+void add_task(struct task_queue *tq,
+             int (*fct)(struct task_queue *tq, void *task),
+             void *task)
+{
+       struct job_list *job_list;
+
+       job_list = xmalloc(sizeof(*job_list));
+       job_list->task = task;
+       job_list->fct = fct;
+       job_list->next = NULL;
+
+       pthread_mutex_lock(&tq->mutex);
+
+       if (!tq->last) {
+               tq->last = job_list;
+               tq->first = tq->last;
+       } else {
+               tq->last->next = job_list;
+               tq->last = tq->last->next;
+       }
+       tq->queued_tasks++;
+
+       pthread_mutex_unlock(&tq->mutex);
+       pthread_cond_signal(&tq->cond_non_empty);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue 
*tq))
+{
+       int ret;
+       int i;
+
+       tq->finish_function = fct;
+
+       for (i = 0; i < tq->max_threads; i++)
+               add_task(tq, NULL, NULL);
+
+       for (i = 0; i < tq->max_threads; i++)
+               pthread_join(tq->threads[i], 0);
+
+       pthread_mutex_destroy(&tq->mutex);
+       pthread_cond_destroy(&tq->cond_non_empty);
+
+       if (tq->first)
+               die("BUG: internal error with queuing jobs for threads");
+
+       free(tq->threads);
+       ret = tq->early_return;
+
+       free(tq);
+       return ret;
+}
+#else /* NO_PTHREADS */
+
+struct task_queue {
+       int early_return;
+};
+
+struct task_queue *create_task_queue(unsigned max_threads)
+{
+       struct task_queue *tq = xmalloc(sizeof(*tq));
+
+       tq->early_return = 0;
+}
+
+void add_task(struct task_queue *tq,
+             int (*fct)(struct task_queue *tq, void *task),
+             void *task)
+{
+       if (tq->early_return)
+               return;
+
+       tq->early_return |= fct(tq, task);
+}
+
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue 
*tq))
+{
+       int ret = tq->early_return;
+       free(tq);
+       return ret;
+}
+#endif
diff --git a/thread-utils.h b/thread-utils.h
index d9a769d..f41cfb1 100644
--- a/thread-utils.h
+++ b/thread-utils.h
@@ -12,4 +12,39 @@ extern int init_recursive_mutex(pthread_mutex_t*);
 #define online_cpus() 1
 
 #endif
+
+/*
+ * Creates a struct `task_queue`, which holds a list of tasks. Up to
+ * `max_threads` threads are active to process the enqueued tasks
+ * processing the tasks in a first in first out order.
+ *
+ * If `max_threads` is zero the number of cores available will be used.
+ *
+ * Currently this only works in environments with pthreads, in other
+ * environments, the task will be processed sequentially in `add_task`.
+ */
+struct task_queue *create_task_queue(unsigned max_threads);
+
+/*
+ * The function and data are put into the task queue.
+ *
+ * The function `fct` must not be NULL, as that's used internally
+ * in `finish_task_queue` to signal shutdown. If the return code
+ * of `fct` is unequal to 0, the tasks will stop eventually,
+ * the current parallel tasks will be flushed out.
+ */
+void add_task(struct task_queue *tq,
+             int (*fct)(struct task_queue *tq, void *task),
+             void *task);
+
+/*
+ * Waits for all tasks to be done and frees the object. The return code
+ * is zero if all enqueued tasks were processed.
+ *
+ * The function `fct` is called once in each thread after the last task
+ * for that thread was processed. If no thread local cleanup needs to be
+ * performed, pass NULL.
+ */
+int finish_task_queue(struct task_queue *tq, void (*fct)(struct task_queue 
*tq));
+
 #endif /* THREAD_COMPAT_H */
-- 
2.5.0.264.g5e52b0d

--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to