From: MORITA Kazutaka <[email protected]>

Currently, if we call queue_work() in the second event_loop(),
work_queue_wait() cannot wait for completion of the work.

This adds a field 'nr_works' to the worker_info and makes
work_queue_empty() return true only if both worker_routine() and
worker_thread_request_done() have no outstanding works.  I think this
removes a race condition in work_queue_wait() completely.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 collie/common.c |    7 -------
 lib/work.c      |   25 +++++++++----------------
 2 files changed, 9 insertions(+), 23 deletions(-)

diff --git a/collie/common.c b/collie/common.c
index 0646587..e4d33e5 100644
--- a/collie/common.c
+++ b/collie/common.c
@@ -249,13 +249,6 @@ void confirm(const char *message)
 
 void work_queue_wait(struct work_queue *q)
 {
-       assert(is_main_thread());
-
        while (!work_queue_empty(q))
                event_loop(-1);
-       /*
-        * We have to call event_loop() again because some works are remained in
-        * the finished list.
-        */
-       event_loop(-1);
 }
diff --git a/lib/work.c b/lib/work.c
index cb8879d..61c1197 100644
--- a/lib/work.c
+++ b/lib/work.c
@@ -55,9 +55,11 @@ struct worker_info {
        pthread_mutex_t pending_lock;
        /* protected by pending_lock */
        struct work_queue q;
-       size_t nr_pending;
-       size_t nr_running;
        size_t nr_threads;
+
+       /* protected by uatomic primitives */
+       size_t nr_works;
+
        /* we cannot shrink work queue till this time */
        uint64_t tm_end_of_protection;
        enum wq_thread_control tc;
@@ -102,7 +104,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi)
 
 static bool wq_need_grow(struct worker_info *wi)
 {
-       if (wi->nr_threads < wi->nr_pending + wi->nr_running &&
+       if (wi->nr_threads < uatomic_read(&wi->nr_works) &&
            wi->nr_threads * 2 <= wq_get_roof(wi)) {
                wi->tm_end_of_protection = get_msec_time() +
                        WQ_PROTECTION_PERIOD;
@@ -118,7 +120,7 @@ static bool wq_need_grow(struct worker_info *wi)
  */
 static bool wq_need_shrink(struct worker_info *wi)
 {
-       if (wi->nr_pending + wi->nr_running <= wi->nr_threads / 2)
+       if (uatomic_read(&wi->nr_works) < wi->nr_threads / 2)
                /* we cannot shrink work queue during protection period. */
                return wi->tm_end_of_protection <= get_msec_time();
 
@@ -154,8 +156,8 @@ void queue_work(struct work_queue *q, struct work *work)
 {
        struct worker_info *wi = container_of(q, struct worker_info, q);
 
+       uatomic_inc(&wi->nr_works);
        pthread_mutex_lock(&wi->pending_lock);
-       wi->nr_pending++;
 
        if (wq_need_grow(wi))
                /* double the thread pool size */
@@ -192,6 +194,7 @@ static void worker_thread_request_done(int fd, int events, 
void *data)
                        list_del(&work->w_list);
 
                        work->done(work);
+                       uatomic_dec(&wi->nr_works);
                }
        }
 }
@@ -209,14 +212,12 @@ static void *worker_routine(void *arg)
        pthread_mutex_unlock(&wi->startup_lock);
 
        pthread_mutex_lock(&wi->pending_lock);
-       wi->nr_running++;
        pthread_mutex_unlock(&wi->pending_lock);
 
        while (true) {
 
                pthread_mutex_lock(&wi->pending_lock);
                if (wq_need_shrink(wi)) {
-                       wi->nr_running--;
                        wi->nr_threads--;
                        if (wq_destroy_cb)
                                wq_destroy_cb(pthread_self());
@@ -228,13 +229,10 @@ static void *worker_routine(void *arg)
                }
 retest:
                if (list_empty(&wi->q.pending_list)) {
-                       wi->nr_running--;
                        pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
-                       wi->nr_running++;
                        goto retest;
                }
 
-               wi->nr_pending--;
                work = list_first_entry(&wi->q.pending_list,
                                       struct work, w_list);
 
@@ -337,11 +335,6 @@ struct work_queue *create_ordered_work_queue(const char 
*name)
 bool work_queue_empty(struct work_queue *q)
 {
        struct worker_info *wi = container_of(q, struct worker_info, q);
-       size_t nr_works;
-
-       pthread_mutex_lock(&wi->pending_lock);
-       nr_works = wi->nr_running + wi->nr_pending;
-       pthread_mutex_unlock(&wi->pending_lock);
 
-       return nr_works == 0;
+       return uatomic_read(&wi->nr_works) == 0;
 }
-- 
1.7.9.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to