From: Nadav Har'El <[email protected]>
Committer: Nadav Har'El <[email protected]>
Branch: master
reimplement pthread_barrier_*()
Our pthread barrier implementation had a race described in issue #857,
where one of the threads may start its next round of waiting before the
counter was reset, which can lead to a hang as not enough threads decrease
the counter.
This patch provides a completely new implementation, inspired by
the first algorithm in https://locklessinc.com/articles/barriers/.
Basically, all threads need to agree to reset the counter by bringing
it up from a negative value back to 0 (see more explanations in the
code).
Fixes #857.
The new implementation doesn't use "latch". Rather, we use only a single
mutex, and a single condition variable (actually, a waitqueue which is
like a condition variable but does not require an additional internal
mutex in its implementation).
Signed-off-by: Nadav Har'El <[email protected]>
Message-Id: <[email protected]>
---
diff --git a/libc/pthread_barrier.cc b/libc/pthread_barrier.cc
--- a/libc/pthread_barrier.cc
+++ b/libc/pthread_barrier.cc
@@ -1,24 +1,38 @@
/*
- * Copyright (C) 2013 Cloudius Systems, Ltd.
+ * Copyright (C) 2017 ScyllaDB, Ltd.
*
* This work is open source software, licensed under the terms of the
* BSD license as described in the LICENSE file in the top-level directory.
*/
#include <pthread.h>
-#include <osv/debug.hh>
-#include <osv/rwlock.h>
-#include <osv/latch.hh>
-#include "pthread.hh"
+#include <osv/mutex.h>
+#include <osv/waitqueue.hh>
// Private definitions of the internal structs backing pthread_barrier_t
and
// pthread_barrierattr_t
typedef struct
{
- unsigned int out;
+ // "count" is barrier's count set by pthread_barrier_init()
unsigned int count;
- latch *ltch;
- pthread_mutex_t *mtx;
+ // "state" uses positive integers to count the number of threads which
+ // are waiting on the barrier, and then negative integers to count the
+ // number of threads that have been released from the wait; The
+ // second part is necessary for correct reusability of the barrier
+ // (so the threads can wait on the same barrier again).
+ // "state" starts at 0 and goes from 1 through count-1 as more threads
+ // wait on the barrier. When the count'th thread reaches the barrier,
+ // "state" is negated to -count + 1, all waiting threads are woken, as
+ // as each is woken it increases the negative "state" further until
+ // finally all waiting threads have woken, state reaches 0 again, and
+ // the whole thing can start again.
+ int state;
+ // "mtx" is the mutex used to protect "state" as well as the condition
+ // variable "cv". We actually used a waitqueue rather than a condvar -
+ // the main difference is that a waitqueue is protected by our mutex
+ // mtx, rather than having a second internal mutex.
+ mutex* mtx;
+ waitqueue* cv;
} pthread_barrier_t_int;
typedef struct
@@ -47,10 +61,9 @@ int pthread_barrier_init(pthread_barrier_t *barrier_opq,
// threads can manipulate the memory area associated with the
// pthread_barrier_t so it doesn't matter what the value of pshared is
set to
barrier->count = count;
- barrier->out = 0;
- barrier->ltch = new latch(count);
- barrier->mtx = new pthread_mutex_t;
- pthread_mutex_init(barrier->mtx, NULL);
+ barrier->state = 0;
+ barrier->mtx = new mutex;
+ barrier->cv = new waitqueue;
return 0;
}
@@ -60,48 +73,36 @@ int pthread_barrier_wait(pthread_barrier_t *barrier_opq)
static_assert(sizeof(pthread_barrier_t_int) <=
sizeof(pthread_barrier_t),
"pthread_barrier_t_int is larger than
pthread_barrier_t");
- if (!barrier || !barrier->ltch || !barrier->mtx) {
+ if (!barrier || !barrier->count || !barrier->mtx) {
return EINVAL;
}
- int retval = 0;
- pthread_mutex_t *mtx = barrier->mtx;
-
- pthread_mutex_lock(mtx);
- pthread_mutex_unlock(mtx);
-
- latch *l = barrier->ltch;
- l->count_down();
- // All threads stuck here until we get at least 'count' waiters
- l->await();
-
- // If the last thread (thread x) to wait on the barrier is descheduled
here
- // (immediately after being the count'th thread crossing the barrier)
- // the barrier remains open (a new waiting thread will cross) until
- // the barrier is reset below (when thread x is rescheduled), which
doesn't
- // seem technically incorrect. Only one of the crossing threads will
get a
- // retval of PTHREAD_BARRIER_SERIAL_THREAD, when
- // barrier->out == barrier->count.
- // All other crossing threads will get a retval of 0.
-
- pthread_mutex_lock(mtx);
- barrier->out++;
- // Make the last thread out responsible for resetting the barrier's
latch.
- // The last thread also gets the special return value
- // PTHREAD_BARRIER_SERIAL_THREAD. Every other thread gets a retval of 0
- if (barrier->out == barrier->count) {
- retval = PTHREAD_BARRIER_SERIAL_THREAD;
- // Reset the latch for the next round of waiters. We're using an
- // external lock (mtx) to ensure that no other thread is calling
- // count_down or in await when we're resetting it. Without the
external
- // lock, resetting the latch isn't safe.
- l->unsafe_reset(barrier->count);
- // Reset the 'out' counter so that the equality check above works
across
- // multiple rounds of threads waiting on the barrier
- barrier->out = 0;
+ SCOPE_LOCK(*barrier->mtx);
+ while (barrier->state < 0) {
+ // Can't start another round until all threads exited the previous
+ // round.
+ barrier->cv->wait(*barrier->mtx);
+ }
+ if (++barrier->state == (int)barrier->count) {
+ // We're the last of the count threads to enter the barrier.
+ barrier->state = -barrier->count + 1;
+ barrier->cv->wake_all(*barrier->mtx);
+ return PTHREAD_BARRIER_SERIAL_THREAD;
+ } else {
+ // Not enough threads have reached the barrier yet. Wait until
+ // the count'th one enters, and changes barrier->state to negative.
+ while (barrier->state > 0) {
+ barrier->cv->wait(*barrier->mtx);
+ }
+ // We're in negative state, and increasing it for every thread
+ // which is done waiting in this round. If the state reaches
+ // 0 again, everybody who's already waiting for the next round
+ // can be woekn.
+ if (++barrier->state == 0) {
+ barrier->cv->wake_all(*barrier->mtx);
+ }
+ return 0;
}
- pthread_mutex_unlock(mtx);
- return retval;
}
int pthread_barrier_destroy(pthread_barrier_t *barrier_opq)
@@ -111,16 +112,22 @@ int pthread_barrier_destroy(pthread_barrier_t
*barrier_opq)
static_assert(sizeof(pthread_barrier_t_int) <=
sizeof(pthread_barrier_t),
"pthread_barrier_t_int is larger than
pthread_barrier_t");
- if (!barrier || !barrier->ltch || !barrier->mtx) {
+ if (!barrier || !barrier->count || !barrier->mtx) {
return EINVAL;
}
- delete barrier->ltch;
- barrier->ltch = nullptr;
+ // One of threads, probably that which got
PTHREAD_BARRIER_SERIAL_THREAD,
+ // might decide to delete the barrier while the other threads have not
+ // yet returned from their barrier_wait. So we may need to wait for
+ // those threads to complete before destroying the barrier:
+ WITH_LOCK(*barrier->mtx) {
+ while (barrier->state < 0) {
+ barrier->cv->wait(*barrier->mtx);
+ }
+ }
- pthread_mutex_destroy(barrier->mtx);
delete barrier->mtx;
+ delete barrier->cv;
barrier->mtx = nullptr;
-
return 0;
}
--
You received this message because you are subscribed to the Google Groups "OSv
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
For more options, visit https://groups.google.com/d/optout.