From: MORITA Kazutaka <[email protected]> Currently, if we call queue_work() in the second event_loop(), work_queue_wait() cannot wait for completion of the work.
This adds a field 'nr_works' to the worker_info and makes work_queue_empty() return true only if both worker_routine() and worker_thread_request_done() have no outstanding works. I think this removes a race condition in work_queue_wait() completely. Signed-off-by: MORITA Kazutaka <[email protected]> --- collie/common.c | 7 ------- lib/work.c | 25 +++++++++---------------- 2 files changed, 9 insertions(+), 23 deletions(-) diff --git a/collie/common.c b/collie/common.c index 0646587..e4d33e5 100644 --- a/collie/common.c +++ b/collie/common.c @@ -249,13 +249,6 @@ void confirm(const char *message) void work_queue_wait(struct work_queue *q) { - assert(is_main_thread()); - while (!work_queue_empty(q)) event_loop(-1); - /* - * We have to call event_loop() again because some works are remained in - * the finished list. - */ - event_loop(-1); } diff --git a/lib/work.c b/lib/work.c index cb8879d..61c1197 100644 --- a/lib/work.c +++ b/lib/work.c @@ -55,9 +55,11 @@ struct worker_info { pthread_mutex_t pending_lock; /* protected by pending_lock */ struct work_queue q; - size_t nr_pending; - size_t nr_running; size_t nr_threads; + + /* protected by uatomic primitives */ + size_t nr_works; + /* we cannot shrink work queue till this time */ uint64_t tm_end_of_protection; enum wq_thread_control tc; @@ -102,7 +104,7 @@ static inline uint64_t wq_get_roof(struct worker_info *wi) static bool wq_need_grow(struct worker_info *wi) { - if (wi->nr_threads < wi->nr_pending + wi->nr_running && + if (wi->nr_threads < uatomic_read(&wi->nr_works) && wi->nr_threads * 2 <= wq_get_roof(wi)) { wi->tm_end_of_protection = get_msec_time() + WQ_PROTECTION_PERIOD; @@ -118,7 +120,7 @@ static bool wq_need_grow(struct worker_info *wi) */ static bool wq_need_shrink(struct worker_info *wi) { - if (wi->nr_pending + wi->nr_running <= wi->nr_threads / 2) + if (uatomic_read(&wi->nr_works) < wi->nr_threads / 2) /* we cannot shrink work queue during protection period. */ return wi->tm_end_of_protection <= get_msec_time(); @@ -154,8 +156,8 @@ void queue_work(struct work_queue *q, struct work *work) { struct worker_info *wi = container_of(q, struct worker_info, q); + uatomic_inc(&wi->nr_works); pthread_mutex_lock(&wi->pending_lock); - wi->nr_pending++; if (wq_need_grow(wi)) /* double the thread pool size */ @@ -192,6 +194,7 @@ static void worker_thread_request_done(int fd, int events, void *data) list_del(&work->w_list); work->done(work); + uatomic_dec(&wi->nr_works); } } } @@ -209,14 +212,12 @@ static void *worker_routine(void *arg) pthread_mutex_unlock(&wi->startup_lock); pthread_mutex_lock(&wi->pending_lock); - wi->nr_running++; pthread_mutex_unlock(&wi->pending_lock); while (true) { pthread_mutex_lock(&wi->pending_lock); if (wq_need_shrink(wi)) { - wi->nr_running--; wi->nr_threads--; if (wq_destroy_cb) wq_destroy_cb(pthread_self()); @@ -228,13 +229,10 @@ static void *worker_routine(void *arg) } retest: if (list_empty(&wi->q.pending_list)) { - wi->nr_running--; pthread_cond_wait(&wi->pending_cond, &wi->pending_lock); - wi->nr_running++; goto retest; } - wi->nr_pending--; work = list_first_entry(&wi->q.pending_list, struct work, w_list); @@ -337,11 +335,6 @@ struct work_queue *create_ordered_work_queue(const char *name) bool work_queue_empty(struct work_queue *q) { struct worker_info *wi = container_of(q, struct worker_info, q); - size_t nr_works; - - pthread_mutex_lock(&wi->pending_lock); - nr_works = wi->nr_running + wi->nr_pending; - pthread_mutex_unlock(&wi->pending_lock); - return nr_works == 0; + return uatomic_read(&wi->nr_works) == 0; } -- 1.7.9.5 -- sheepdog mailing list [email protected] http://lists.wpkg.org/mailman/listinfo/sheepdog
