From: Liu Yuan <[email protected]>

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/trace/trace.c |  148 ++++++++++++++++++---------------------------------
 sheep/trace/trace.h |   10 ++--
 sheep/work.c        |    6 +--
 sheep/work.h        |    7 +--
 4 files changed, 62 insertions(+), 109 deletions(-)

diff --git a/sheep/trace/trace.c b/sheep/trace/trace.c
index 377c4f2..dc0af97 100644
--- a/sheep/trace/trace.c
+++ b/sheep/trace/trace.c
@@ -16,15 +16,12 @@
 #include <unistd.h>
 #include <pthread.h>
 #include <signal.h>
-#include <sys/eventfd.h>
 
 #include "trace.h"
 #include "logger.h"
 #include "list.h"
-#include "work.h"
 #include "sheepdog_proto.h"
 #include "strbuf.h"
-#include "event.h"
 
 #define TRACE_HASH_BITS       7
 #define TRACE_HASH_SIZE       (1 << TRACE_HASH_BITS)
@@ -35,9 +32,7 @@ static pthread_mutex_t trace_lock = PTHREAD_MUTEX_INITIALIZER;
 
 static trace_func_t trace_func = trace_call;
 
-static int trace_efd;
-static int nr_short_thread;
-static bool trace_in_patch;
+static int total_nr_workers;
 
 static pthread_mutex_t suspend_lock = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t suspend_cond = PTHREAD_COND_INITIALIZER;
@@ -45,6 +40,12 @@ static int suspend_count;
 
 static struct strbuf *buffer;
 static int nr_cpu;
+static LIST_HEAD(worker_list);
+
+struct worker {
+       struct list_head list;
+       pthread_t id;
+};
 
 union instruction {
        unsigned char start[INSN_SIZE];
@@ -180,17 +181,17 @@ notrace int register_trace_function(trace_func_t func)
 
 static notrace void suspend_worker_threads(void)
 {
-       struct worker_info *wi;
-       suspend_count = total_ordered_workers;
+       struct worker *w;
 
-       /* Hold the lock, then all other worker can sleep on it */
-       list_for_each_entry(wi, &worker_info_list, worker_info_siblings) {
-               if (wi->ordered &&
-                   pthread_kill(wi->worker_thread, SIGUSR2) != 0)
-                       sd_dprintf("%m");
+       pthread_mutex_lock(&trace_lock);
+       suspend_count = total_nr_workers;
+       list_for_each_entry(w, &worker_list, list) {
+               if (pthread_kill(w->id, SIGUSR2) != 0)
+                       panic("%m");
        }
 
 wait_for_worker_suspend:
+       /* Hold the lock, then all other worker can sleep on it */
        pthread_mutex_lock(&suspend_lock);
        if (suspend_count > 0) {
                pthread_mutex_unlock(&suspend_lock);
@@ -198,6 +199,7 @@ wait_for_worker_suspend:
                goto wait_for_worker_suspend;
        }
        pthread_mutex_unlock(&suspend_lock);
+       pthread_mutex_unlock(&trace_lock);
 }
 
 static notrace void resume_worker_threads(void)
@@ -231,76 +233,27 @@ static notrace void nop_all_sites(void)
        pthread_mutex_unlock(&trace_lock);
 }
 
-static inline bool short_thread_running(void)
-{
-       return !!nr_short_thread;
-}
-
-static notrace void enable_tracer(int fd, int events, void *data)
+notrace int trace_enable(void)
 {
-       eventfd_t value;
-       int ret;
-
-       ret = eventfd_read(trace_efd, &value);
-       /*
-        * In error case we can't retry read in main thread, simply return and
-        * expected to be waken up by epoll again.
-        */
-       if (ret < 0) {
-               sd_eprintf("%m");
-               return;
+       if (trace_func == trace_call) {
+               sd_dprintf("no tracer available");
+               return SD_RES_NO_TAG;
        }
 
-       if (short_thread_running())
-               return;
-
        suspend_worker_threads();
        patch_all_sites((unsigned long)trace_caller);
        resume_worker_threads();
-       unregister_event(trace_efd);
-       trace_in_patch = false;
        sd_dprintf("tracer enabled");
+
+       return SD_RES_SUCCESS;
 }
 
-static notrace void disable_tracer(int fd, int events, void *data)
+notrace int trace_disable(void)
 {
-       eventfd_t value;
-       int ret;
-
-       ret = eventfd_read(fd, &value);
-       if (ret < 0) {
-               sd_eprintf("%m");
-               return;
-       }
-
-       if (short_thread_running())
-               return;
-
        suspend_worker_threads();
        nop_all_sites();
        resume_worker_threads();
-       unregister_event(trace_efd);
-       trace_in_patch = false;
        sd_dprintf("tracer disabled");
-}
-
-notrace int trace_enable(void)
-{
-       if (trace_func == trace_call) {
-               sd_dprintf("no tracer available");
-               return SD_RES_NO_TAG;
-       }
-
-       register_event(trace_efd, enable_tracer, NULL);
-       trace_in_patch = true;
-
-       return SD_RES_SUCCESS;
-}
-
-notrace int trace_disable(void)
-{
-       register_event(trace_efd, disable_tracer, NULL);
-       trace_in_patch = true;
 
        return SD_RES_SUCCESS;
 }
@@ -321,9 +274,6 @@ notrace int trace_buffer_pop(void *buf, uint32_t len)
        char *buff = (char *)buf;
        int i;
 
-       if (trace_in_patch)
-               return -1;
-
        for (i = 0; i < nr_cpu; i++) {
                readin = strbuf_stripout(&buffer[i], buff, len);
                count += readin;
@@ -344,32 +294,10 @@ notrace void trace_buffer_push(int cpuid, struct 
trace_graph_item *item)
        strbuf_add(&buffer[cpuid], item, sizeof(*item));
 }
 
-void short_thread_begin(void)
-{
-       nr_short_thread++;
-}
-
-void short_thread_end(void)
-{
-       eventfd_t value = 1;
-
-       nr_short_thread--;
-       if (trace_in_patch && nr_short_thread == 0)
-               eventfd_write(trace_efd, value);
-}
-
 notrace int trace_init(void)
 {
-       sigset_t block;
        int i;
 
-       sigemptyset(&block);
-       sigaddset(&block, SIGUSR2);
-       if (pthread_sigmask(SIG_BLOCK, &block, NULL) != 0) {
-               sd_dprintf("%m");
-               return -1;
-       }
-
        if (make_text_writable((unsigned long)mcount_call) < 0) {
                sd_dprintf("%m");
                return -1;
@@ -382,8 +310,34 @@ notrace int trace_init(void)
        for (i = 0; i < nr_cpu; i++)
                strbuf_init(&buffer[i], 0);
 
-       trace_efd = eventfd(0, EFD_NONBLOCK);
-
-       sd_dprintf("trace support enabled. cpu count %d.", nr_cpu);
+       sd_iprintf("trace support enabled. cpu count %d.", nr_cpu);
        return 0;
 }
+
+notrace void trace_register_thread(pthread_t id)
+{
+       struct worker *new = xmalloc(sizeof(*new));
+
+       new->id = id;
+
+       pthread_mutex_lock(&trace_lock);
+       list_add(&new->list, &worker_list);
+       total_nr_workers++;
+       pthread_mutex_unlock(&trace_lock);
+       sd_dprintf("nr %d, add pid %lx", total_nr_workers, id);
+}
+
+notrace void trace_unregister_thread(pthread_t id)
+{
+       struct worker *w, *tmp;
+
+       pthread_mutex_lock(&trace_lock);
+       list_for_each_entry_safe(w, tmp, &worker_list, list) {
+               if (w->id == id) {
+                       list_del(&w->list);
+                       total_nr_workers--;
+               }
+       }
+       pthread_mutex_unlock(&trace_lock);
+       sd_dprintf("nr %d, del pid %lx", total_nr_workers, id);
+}
diff --git a/sheep/trace/trace.h b/sheep/trace/trace.h
index 3fe38f7..5243207 100644
--- a/sheep/trace/trace.h
+++ b/sheep/trace/trace.h
@@ -10,6 +10,7 @@
 #include "sheep.h"
 #include "list.h"
 #include "util.h"
+#include "../work.h"
 
 struct ipinfo {
        const char *file;           /* Source code filename for EIP */
@@ -57,8 +58,9 @@ unsigned long trace_return_call(void);
   struct caller *trace_lookup_ip(unsigned long ip, bool create);
   int trace_buffer_pop(void *buf, uint32_t len);
   void trace_buffer_push(int cpuid, struct trace_graph_item *item);
-  void short_thread_begin(void);
-  void short_thread_end(void);
+  void trace_register_thread(pthread_t id);
+  void trace_unregister_thread(pthread_t id);
+
 #else
   static inline int trace_init_signal(void) { return 0; }
   static inline int trace_init(void) { return 0; }
@@ -67,8 +69,8 @@ unsigned long trace_return_call(void);
   static inline int trace_buffer_pop(void *buf, uint32_t len) { return 0; }
   static inline void trace_buffer_push(
          int cpuid, struct trace_graph_item *item) { return; }
-  static inline void short_thread_begin(void) { return; }
-  static inline void short_thread_end(void) { return; }
+  static inline void trace_register_thread(pthread_t id){ return; }
+  static inline void trace_unregister_thread(pthread_t id) { return; }
 
 #endif /* ENABLE_TRACE */
 
diff --git a/sheep/work.c b/sheep/work.c
index e71afbc..5dc319a 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -43,7 +43,6 @@
 #define WQ_PROTECTION_PERIOD 1000 /* ms */
 
 static int efd;
-int total_ordered_workers;
 LIST_HEAD(worker_info_list);
 
 static void *worker_routine(void *arg);
@@ -115,14 +114,14 @@ static int create_worker_threads(struct worker_info *wi, 
size_t nr_threads)
 
        pthread_mutex_lock(&wi->startup_lock);
        while (wi->nr_threads < nr_threads) {
-               wi->nr_threads++;
                ret = pthread_create(&thread, NULL, worker_routine, wi);
                if (ret != 0) {
                        sd_eprintf("failed to create worker thread: %m");
-                       wi->nr_threads--;
                        pthread_mutex_unlock(&wi->startup_lock);
                        return -1;
                }
+               trace_register_thread(thread);
+               wi->nr_threads++;
                sd_dprintf("create thread %s %zd", wi->name, wi->nr_threads);
        }
        pthread_mutex_unlock(&wi->startup_lock);
@@ -195,6 +194,7 @@ static void *worker_routine(void *arg)
                if (wq_need_shrink(wi)) {
                        wi->nr_running--;
                        wi->nr_threads--;
+                       trace_unregister_thread(pthread_self());
                        pthread_mutex_unlock(&wi->pending_lock);
                        pthread_detach(pthread_self());
                        sd_dprintf("destroy thread %s %d, %zd", wi->name,
diff --git a/sheep/work.h b/sheep/work.h
index 0366cb3..5706a84 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -28,10 +28,11 @@ enum wq_thread_control {
 struct worker_info {
        const char *name;
 
+       struct list_head finished_list;
        struct list_head worker_info_siblings;
 
        pthread_mutex_t finished_lock;
-       struct list_head finished_list;
+       pthread_mutex_t startup_lock;
 
        /* wokers sleep on this and signaled by tgtd */
        pthread_cond_t pending_cond;
@@ -44,14 +45,10 @@ struct worker_info {
        size_t nr_threads;
        /* we cannot shrink work queue till this time */
        uint64_t tm_end_of_protection;
-
-       pthread_mutex_t startup_lock;
-
        enum wq_thread_control tc;
 };
 
 extern struct list_head worker_info_list;
-extern int total_ordered_workers;
 
 struct work_queue *init_work_queue(const char *name, enum wq_thread_control);
 struct work_queue *init_ordered_work_queue(const char *name);
-- 
1.7.9.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to