From: MORITA Kazutaka <[email protected]> This removes sheep specific header files to move work.c to lib.
Signed-off-by: MORITA Kazutaka <[email protected]> --- sheep/sheep.c | 16 +++++++++++++++- sheep/trace/trace.h | 1 - sheep/work.c | 37 ++++++++++++++++++------------------- sheep/work.h | 8 +++++++- 4 files changed, 40 insertions(+), 22 deletions(-) diff --git a/sheep/sheep.c b/sheep/sheep.c index 537c86e..0238a9f 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -331,9 +331,23 @@ static void init_io_arg(char *arg) } } +static size_t get_nr_nodes(void) +{ + struct vnode_info *vinfo; + size_t nr = 1; + + vinfo = get_vnode_info(); + if (vinfo != NULL) + nr = vinfo->nr_nodes; + put_vnode_info(vinfo); + + return nr; +} + static int create_work_queues(void) { - if (init_work_queue()) + if (init_work_queue(get_nr_nodes, trace_register_thread, + trace_unregister_thread)) return -1; sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED); diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h index 9648834..2a920a5 100644 --- a/sheep/trace/trace.h +++ b/sheep/trace/trace.h @@ -10,7 +10,6 @@ #include "sheep.h" #include "list.h" #include "util.h" -#include "../work.h" struct ipinfo { const char *file; /* Source code filename for EIP */ diff --git a/sheep/work.c b/sheep/work.c index f79dd1d..1a90561 100644 --- a/sheep/work.c +++ b/sheep/work.c @@ -32,8 +32,6 @@ #include "work.h" #include "logger.h" #include "event.h" -#include "trace/trace.h" -#include "sheep_priv.h" /* * The protection period from shrinking work queue. This is necessary @@ -63,11 +61,14 @@ struct worker_info { /* we cannot shrink work queue till this time */ uint64_t tm_end_of_protection; enum wq_thread_control tc; - size_t nr_nodes; }; static int efd; static LIST_HEAD(worker_info_list); +static size_t nr_nodes = 1; +static size_t (*wq_get_nr_nodes)(void); +static void (*wq_create_cb)(pthread_t); +static void (*wq_destroy_cb)(pthread_t); static void *worker_routine(void *arg); @@ -88,7 +89,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi) break; case WQ_DYNAMIC: /* FIXME: 2 * nr_nodes threads. No rationale yet. */ - nr = wi->nr_nodes * 2; + nr = nr_nodes * 2; break; case WQ_UNLIMITED: nr = SIZE_MAX; @@ -139,7 +140,8 @@ static int create_worker_threads(struct worker_info *wi, size_t nr_threads) pthread_mutex_unlock(&wi->startup_lock); return -1; } - trace_register_thread(thread); + if (wq_create_cb) + wq_create_cb(thread); wi->nr_threads++; sd_dprintf("create thread %s %zu", wi->name, wi->nr_threads); } @@ -172,22 +174,15 @@ static void worker_thread_request_done(int fd, int events, void *data) struct work *work; eventfd_t value; LIST_HEAD(list); - struct vnode_info *vinfo; - size_t nr_nodes; + + if (wq_get_nr_nodes) + nr_nodes = wq_get_nr_nodes(); ret = eventfd_read(fd, &value); if (ret < 0) return; - vinfo = get_vnode_info(); - if (vinfo != NULL) - nr_nodes = vinfo->nr_nodes; - else - nr_nodes = 1; /* cluster doesn't start yet */ - list_for_each_entry(wi, &worker_info_list, worker_info_siblings) { - wi->nr_nodes = nr_nodes; - pthread_mutex_lock(&wi->finished_lock); list_splice_init(&wi->finished_list, &list); pthread_mutex_unlock(&wi->finished_lock); @@ -199,8 +194,6 @@ static void worker_thread_request_done(int fd, int events, void *data) work->done(work); } } - - put_vnode_info(vinfo); } static void *worker_routine(void *arg) @@ -225,7 +218,8 @@ static void *worker_routine(void *arg) if (wq_need_shrink(wi)) { wi->nr_running--; wi->nr_threads--; - trace_unregister_thread(pthread_self()); + if (wq_destroy_cb) + wq_destroy_cb(pthread_self()); pthread_mutex_unlock(&wi->pending_lock); pthread_detach(pthread_self()); sd_dprintf("destroy thread %s %d, %zu", wi->name, @@ -260,10 +254,15 @@ retest: pthread_exit(NULL); } -int init_work_queue(void) +int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t), + void (*destroy_cb)(pthread_t)) { int ret; + wq_get_nr_nodes = get_nr_nodes; + wq_create_cb = create_cb; + wq_destroy_cb = destroy_cb; + efd = eventfd(0, EFD_NONBLOCK); if (efd < 0) { sd_eprintf("failed to create an event fd: %m"); diff --git a/sheep/work.h b/sheep/work.h index b036e3a..0317b00 100644 --- a/sheep/work.h +++ b/sheep/work.h @@ -24,7 +24,13 @@ enum wq_thread_control { WQ_UNLIMITED, /* Unlimited # of threads created */ }; -int init_work_queue(void); +/* + * 'get_nr_nodes' is the function to get the current number of nodes and used + * for dynamic work queues. 'create_cb' will be called when worker threads are + * created and 'destroy_cb' will be called when worker threads are destroyed. + */ +int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t), + void (*destroy_cb)(pthread_t)); struct work_queue *create_work_queue(const char *name, enum wq_thread_control); struct work_queue *create_ordered_work_queue(const char *name); void queue_work(struct work_queue *q, struct work *work); -- 1.7.9.5 -- sheepdog mailing list [email protected] http://lists.wpkg.org/mailman/listinfo/sheepdog
