From: MORITA Kazutaka <[email protected]>

This removes sheep specific header files to move work.c to lib.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 sheep/sheep.c       |   16 +++++++++++++++-
 sheep/trace/trace.h |    1 -
 sheep/work.c        |   37 ++++++++++++++++++-------------------
 sheep/work.h        |    8 +++++++-
 4 files changed, 40 insertions(+), 22 deletions(-)

diff --git a/sheep/sheep.c b/sheep/sheep.c
index 537c86e..0238a9f 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -331,9 +331,23 @@ static void init_io_arg(char *arg)
        }
 }
 
+static size_t get_nr_nodes(void)
+{
+       struct vnode_info *vinfo;
+       size_t nr = 1;
+
+       vinfo = get_vnode_info();
+       if (vinfo != NULL)
+               nr = vinfo->nr_nodes;
+       put_vnode_info(vinfo);
+
+       return nr;
+}
+
 static int create_work_queues(void)
 {
-       if (init_work_queue())
+       if (init_work_queue(get_nr_nodes, trace_register_thread,
+                           trace_unregister_thread))
                return -1;
 
        sys->gateway_wqueue = create_work_queue("gway", WQ_UNLIMITED);
diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h
index 9648834..2a920a5 100644
--- a/sheep/trace/trace.h
+++ b/sheep/trace/trace.h
@@ -10,7 +10,6 @@
 #include "sheep.h"
 #include "list.h"
 #include "util.h"
-#include "../work.h"
 
 struct ipinfo {
        const char *file;           /* Source code filename for EIP */
diff --git a/sheep/work.c b/sheep/work.c
index f79dd1d..1a90561 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -32,8 +32,6 @@
 #include "work.h"
 #include "logger.h"
 #include "event.h"
-#include "trace/trace.h"
-#include "sheep_priv.h"
 
 /*
  * The protection period from shrinking work queue.  This is necessary
@@ -63,11 +61,14 @@ struct worker_info {
        /* we cannot shrink work queue till this time */
        uint64_t tm_end_of_protection;
        enum wq_thread_control tc;
-       size_t nr_nodes;
 };
 
 static int efd;
 static LIST_HEAD(worker_info_list);
+static size_t nr_nodes = 1;
+static size_t (*wq_get_nr_nodes)(void);
+static void (*wq_create_cb)(pthread_t);
+static void (*wq_destroy_cb)(pthread_t);
 
 static void *worker_routine(void *arg);
 
@@ -88,7 +89,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi)
                break;
        case WQ_DYNAMIC:
                /* FIXME: 2 * nr_nodes threads. No rationale yet. */
-               nr = wi->nr_nodes * 2;
+               nr = nr_nodes * 2;
                break;
        case WQ_UNLIMITED:
                nr = SIZE_MAX;
@@ -139,7 +140,8 @@ static int create_worker_threads(struct worker_info *wi, 
size_t nr_threads)
                        pthread_mutex_unlock(&wi->startup_lock);
                        return -1;
                }
-               trace_register_thread(thread);
+               if (wq_create_cb)
+                       wq_create_cb(thread);
                wi->nr_threads++;
                sd_dprintf("create thread %s %zu", wi->name, wi->nr_threads);
        }
@@ -172,22 +174,15 @@ static void worker_thread_request_done(int fd, int 
events, void *data)
        struct work *work;
        eventfd_t value;
        LIST_HEAD(list);
-       struct vnode_info *vinfo;
-       size_t nr_nodes;
+
+       if (wq_get_nr_nodes)
+               nr_nodes = wq_get_nr_nodes();
 
        ret = eventfd_read(fd, &value);
        if (ret < 0)
                return;
 
-       vinfo = get_vnode_info();
-       if (vinfo != NULL)
-               nr_nodes = vinfo->nr_nodes;
-       else
-               nr_nodes = 1; /* cluster doesn't start yet */
-
        list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
-               wi->nr_nodes = nr_nodes;
-
                pthread_mutex_lock(&wi->finished_lock);
                list_splice_init(&wi->finished_list, &list);
                pthread_mutex_unlock(&wi->finished_lock);
@@ -199,8 +194,6 @@ static void worker_thread_request_done(int fd, int events, 
void *data)
                        work->done(work);
                }
        }
-
-       put_vnode_info(vinfo);
 }
 
 static void *worker_routine(void *arg)
@@ -225,7 +218,8 @@ static void *worker_routine(void *arg)
                if (wq_need_shrink(wi)) {
                        wi->nr_running--;
                        wi->nr_threads--;
-                       trace_unregister_thread(pthread_self());
+                       if (wq_destroy_cb)
+                               wq_destroy_cb(pthread_self());
                        pthread_mutex_unlock(&wi->pending_lock);
                        pthread_detach(pthread_self());
                        sd_dprintf("destroy thread %s %d, %zu", wi->name,
@@ -260,10 +254,15 @@ retest:
        pthread_exit(NULL);
 }
 
-int init_work_queue(void)
+int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
+                   void (*destroy_cb)(pthread_t))
 {
        int ret;
 
+       wq_get_nr_nodes = get_nr_nodes;
+       wq_create_cb = create_cb;
+       wq_destroy_cb = destroy_cb;
+
        efd = eventfd(0, EFD_NONBLOCK);
        if (efd < 0) {
                sd_eprintf("failed to create an event fd: %m");
diff --git a/sheep/work.h b/sheep/work.h
index b036e3a..0317b00 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -24,7 +24,13 @@ enum wq_thread_control {
        WQ_UNLIMITED, /* Unlimited # of threads created */
 };
 
-int init_work_queue(void);
+/*
+ * 'get_nr_nodes' is the function to get the current number of nodes and used
+ * for dynamic work queues.  'create_cb' will be called when worker threads are
+ * created and 'destroy_cb' will be called when worker threads are destroyed.
+ */
+int init_work_queue(size_t (*get_nr_nodes)(void), void (*create_cb)(pthread_t),
+                   void (*destroy_cb)(pthread_t));
 struct work_queue *create_work_queue(const char *name, enum wq_thread_control);
 struct work_queue *create_ordered_work_queue(const char *name);
 void queue_work(struct work_queue *q, struct work *work);
-- 
1.7.9.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to