Our pthread barrier implementation had a race described in issue #857, where one of the threads may start its next round of waiting before the counter was reset, which can lead to a hang as not enough threads decrease the counter.
This patch provides a completely new implementation, inspired by the first algorithm in https://locklessinc.com/articles/barriers/. Basically, all threads need to agree to reset the counter by bringing it up from a negative value back to 0 (see more explanations in the code). Fixes #857. The new implementation doesn't use "latch". Rather, we use only a single mutex, and a single condition variable (actually, a waitqueue which is like a condition variable but does not require an additional internal mutex in its implementation). Signed-off-by: Nadav Har'El <[email protected]> --- libc/pthread_barrier.cc | 117 +++++++++++++++++++++++++----------------------- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/libc/pthread_barrier.cc b/libc/pthread_barrier.cc index 2437d73..2bfa692 100644 --- a/libc/pthread_barrier.cc +++ b/libc/pthread_barrier.cc @@ -1,24 +1,38 @@ /* - * Copyright (C) 2013 Cloudius Systems, Ltd. + * Copyright (C) 2017 ScyllaDB, Ltd. * * This work is open source software, licensed under the terms of the * BSD license as described in the LICENSE file in the top-level directory. */ #include <pthread.h> -#include <osv/debug.hh> -#include <osv/rwlock.h> -#include <osv/latch.hh> -#include "pthread.hh" +#include <osv/mutex.h> +#include <osv/waitqueue.hh> // Private definitions of the internal structs backing pthread_barrier_t and // pthread_barrierattr_t typedef struct { - unsigned int out; + // "count" is barrier's count set by pthread_barrier_init() unsigned int count; - latch *ltch; - pthread_mutex_t *mtx; + // "state" uses positive integers to count the number of threads which + // are waiting on the barrier, and then negative integers to count the + // number of threads that have been released from the wait; The + // second part is necessary for correct reusability of the barrier + // (so the threads can wait on the same barrier again). + // "state" starts at 0 and goes from 1 through count-1 as more threads + // wait on the barrier. When the count'th thread reaches the barrier, + // "state" is negated to -count + 1, all waiting threads are woken, as + // as each is woken it increases the negative "state" further until + // finally all waiting threads have woken, state reaches 0 again, and + // the whole thing can start again. + int state; + // "mtx" is the mutex used to protect "state" as well as the condition + // variable "cv". We actually used a waitqueue rather than a condvar - + // the main difference is that a waitqueue is protected by our mutex + // mtx, rather than having a second internal mutex. + mutex* mtx; + waitqueue* cv; } pthread_barrier_t_int; typedef struct @@ -47,10 +61,9 @@ int pthread_barrier_init(pthread_barrier_t *barrier_opq, // threads can manipulate the memory area associated with the // pthread_barrier_t so it doesn't matter what the value of pshared is set to barrier->count = count; - barrier->out = 0; - barrier->ltch = new latch(count); - barrier->mtx = new pthread_mutex_t; - pthread_mutex_init(barrier->mtx, NULL); + barrier->state = 0; + barrier->mtx = new mutex; + barrier->cv = new waitqueue; return 0; } @@ -60,48 +73,36 @@ int pthread_barrier_wait(pthread_barrier_t *barrier_opq) static_assert(sizeof(pthread_barrier_t_int) <= sizeof(pthread_barrier_t), "pthread_barrier_t_int is larger than pthread_barrier_t"); - if (!barrier || !barrier->ltch || !barrier->mtx) { + if (!barrier || !barrier->count || !barrier->mtx) { return EINVAL; } - int retval = 0; - pthread_mutex_t *mtx = barrier->mtx; - - pthread_mutex_lock(mtx); - pthread_mutex_unlock(mtx); - - latch *l = barrier->ltch; - l->count_down(); - // All threads stuck here until we get at least 'count' waiters - l->await(); - - // If the last thread (thread x) to wait on the barrier is descheduled here - // (immediately after being the count'th thread crossing the barrier) - // the barrier remains open (a new waiting thread will cross) until - // the barrier is reset below (when thread x is rescheduled), which doesn't - // seem technically incorrect. Only one of the crossing threads will get a - // retval of PTHREAD_BARRIER_SERIAL_THREAD, when - // barrier->out == barrier->count. - // All other crossing threads will get a retval of 0. - - pthread_mutex_lock(mtx); - barrier->out++; - // Make the last thread out responsible for resetting the barrier's latch. - // The last thread also gets the special return value - // PTHREAD_BARRIER_SERIAL_THREAD. Every other thread gets a retval of 0 - if (barrier->out == barrier->count) { - retval = PTHREAD_BARRIER_SERIAL_THREAD; - // Reset the latch for the next round of waiters. We're using an - // external lock (mtx) to ensure that no other thread is calling - // count_down or in await when we're resetting it. Without the external - // lock, resetting the latch isn't safe. - l->unsafe_reset(barrier->count); - // Reset the 'out' counter so that the equality check above works across - // multiple rounds of threads waiting on the barrier - barrier->out = 0; + SCOPE_LOCK(*barrier->mtx); + while (barrier->state < 0) { + // Can't start another round until all threads exited the previous + // round. + barrier->cv->wait(*barrier->mtx); + } + if (++barrier->state == (int)barrier->count) { + // We're the last of the count threads to enter the barrier. + barrier->state = -barrier->count + 1; + barrier->cv->wake_all(*barrier->mtx); + return PTHREAD_BARRIER_SERIAL_THREAD; + } else { + // Not enough threads have reached the barrier yet. Wait until + // the count'th one enters, and changes barrier->state to negative. + while (barrier->state > 0) { + barrier->cv->wait(*barrier->mtx); + } + // We're in negative state, and increasing it for every thread + // which is done waiting in this round. If the state reaches + // 0 again, everybody who's already waiting for the next round + // can be woekn. + if (++barrier->state == 0) { + barrier->cv->wake_all(*barrier->mtx); + } + return 0; } - pthread_mutex_unlock(mtx); - return retval; } int pthread_barrier_destroy(pthread_barrier_t *barrier_opq) @@ -111,16 +112,22 @@ int pthread_barrier_destroy(pthread_barrier_t *barrier_opq) static_assert(sizeof(pthread_barrier_t_int) <= sizeof(pthread_barrier_t), "pthread_barrier_t_int is larger than pthread_barrier_t"); - if (!barrier || !barrier->ltch || !barrier->mtx) { + if (!barrier || !barrier->count || !barrier->mtx) { return EINVAL; } - delete barrier->ltch; - barrier->ltch = nullptr; + // One of threads, probably that which got PTHREAD_BARRIER_SERIAL_THREAD, + // might decide to delete the barrier while the other threads have not + // yet returned from their barrier_wait. So we may need to wait for + // those threads to complete before destroying the barrier: + WITH_LOCK(*barrier->mtx) { + while (barrier->state < 0) { + barrier->cv->wait(*barrier->mtx); + } + } - pthread_mutex_destroy(barrier->mtx); delete barrier->mtx; + delete barrier->cv; barrier->mtx = nullptr; - return 0; } -- 2.9.3 -- You received this message because you are subscribed to the Google Groups "OSv Development" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. For more options, visit https://groups.google.com/d/optout.
