From: Nadav Har'El <[email protected]>
Committer: Nadav Har'El <[email protected]>
Branch: master

reimplement pthread_barrier_*()

Our pthread barrier implementation had a race described in issue #857,
where one of the threads may start its next round of waiting before the
counter was reset, which can lead to a hang as not enough threads decrease
the counter.

This patch provides a completely new implementation, inspired by
the first algorithm in https://locklessinc.com/articles/barriers/.
Basically, all threads need to agree to reset the counter by bringing
it up from a negative value back to 0 (see more explanations in the
code).

Fixes #857.

The new implementation doesn't use "latch". Rather, we use only a single
mutex, and a single condition variable (actually, a waitqueue which is
like a condition variable but does not require an additional internal
mutex in its implementation).

Signed-off-by: Nadav Har'El <[email protected]>
Message-Id: <[email protected]>

---
diff --git a/libc/pthread_barrier.cc b/libc/pthread_barrier.cc
--- a/libc/pthread_barrier.cc
+++ b/libc/pthread_barrier.cc
@@ -1,24 +1,38 @@
 /*
- * Copyright (C) 2013 Cloudius Systems, Ltd.
+ * Copyright (C) 2017 ScyllaDB, Ltd.
  *
  * This work is open source software, licensed under the terms of the
  * BSD license as described in the LICENSE file in the top-level directory.
  */

 #include <pthread.h>
-#include <osv/debug.hh>
-#include <osv/rwlock.h>
-#include <osv/latch.hh>
-#include "pthread.hh"
+#include <osv/mutex.h>
+#include <osv/waitqueue.hh>

// Private definitions of the internal structs backing pthread_barrier_t and
 // pthread_barrierattr_t
 typedef struct
 {
-    unsigned int out;
+    // "count" is barrier's count set by pthread_barrier_init()
     unsigned int count;
-    latch *ltch;
-    pthread_mutex_t *mtx;
+    // "state" uses positive integers to count the number of threads which
+    // are waiting on the barrier, and then negative integers to count the
+    // number of threads that have been released from the wait; The
+    // second part is necessary for correct reusability of the barrier
+    // (so the threads can wait on the same barrier again).
+    // "state" starts at 0 and goes from 1 through count-1 as more threads
+    // wait on the barrier. When the count'th thread reaches the barrier,
+    // "state" is negated to -count + 1, all waiting threads are woken, as
+    // as each is woken it increases the negative "state" further until
+    // finally all waiting threads have woken, state reaches 0 again, and
+    // the whole thing can start again.
+    int state;
+    // "mtx" is the mutex used to protect "state" as well as the condition
+    // variable "cv". We actually used a waitqueue rather than a condvar -
+    // the main difference is that a waitqueue is protected by our mutex
+    // mtx, rather than having a second internal mutex.
+    mutex* mtx;
+    waitqueue* cv;
 } pthread_barrier_t_int;

 typedef struct
@@ -47,10 +61,9 @@ int pthread_barrier_init(pthread_barrier_t *barrier_opq,
     // threads can manipulate the memory area associated with the
// pthread_barrier_t so it doesn't matter what the value of pshared is set to
     barrier->count = count;
-    barrier->out = 0;
-    barrier->ltch = new latch(count);
-    barrier->mtx = new pthread_mutex_t;
-    pthread_mutex_init(barrier->mtx, NULL);
+    barrier->state = 0;
+    barrier->mtx = new mutex;
+    barrier->cv = new waitqueue;
     return 0;
 }

@@ -60,48 +73,36 @@ int pthread_barrier_wait(pthread_barrier_t *barrier_opq)
static_assert(sizeof(pthread_barrier_t_int) <= sizeof(pthread_barrier_t), "pthread_barrier_t_int is larger than pthread_barrier_t");

-    if (!barrier || !barrier->ltch || !barrier->mtx) {
+    if (!barrier || !barrier->count || !barrier->mtx) {
         return EINVAL;
     }

-    int retval = 0;
-    pthread_mutex_t *mtx = barrier->mtx;
-
-    pthread_mutex_lock(mtx);
-    pthread_mutex_unlock(mtx);
-
-    latch *l  = barrier->ltch;
-    l->count_down();
-    // All threads stuck here until we get at least 'count' waiters
-    l->await();
-
- // If the last thread (thread x) to wait on the barrier is descheduled here
-    // (immediately after being the count'th thread crossing the barrier)
-    // the barrier remains open (a new waiting thread will cross) until
- // the barrier is reset below (when thread x is rescheduled), which doesn't - // seem technically incorrect. Only one of the crossing threads will get a
-    // retval of PTHREAD_BARRIER_SERIAL_THREAD, when
-    // barrier->out == barrier->count.
-    // All other crossing threads will get a retval of 0.
-
-    pthread_mutex_lock(mtx);
-    barrier->out++;
- // Make the last thread out responsible for resetting the barrier's latch.
-    // The last thread also gets the special return value
-    // PTHREAD_BARRIER_SERIAL_THREAD. Every other thread gets a retval of 0
-    if (barrier->out == barrier->count) {
-        retval = PTHREAD_BARRIER_SERIAL_THREAD;
-        // Reset the latch for the next round of waiters. We're using an
-        // external lock (mtx) to ensure that no other thread is calling
- // count_down or in await when we're resetting it. Without the external
-        // lock, resetting the latch isn't safe.
-        l->unsafe_reset(barrier->count);
- // Reset the 'out' counter so that the equality check above works across
-        // multiple rounds of threads waiting on the barrier
-        barrier->out = 0;
+    SCOPE_LOCK(*barrier->mtx);
+    while (barrier->state < 0) {
+        // Can't start another round until all threads exited the previous
+        // round.
+        barrier->cv->wait(*barrier->mtx);
+    }
+    if (++barrier->state == (int)barrier->count) {
+        // We're the last of the count threads to enter the barrier.
+        barrier->state = -barrier->count + 1;
+        barrier->cv->wake_all(*barrier->mtx);
+        return PTHREAD_BARRIER_SERIAL_THREAD;
+    } else {
+        // Not enough threads have reached the barrier yet. Wait until
+        // the count'th one enters, and changes barrier->state to negative.
+        while (barrier->state > 0) {
+            barrier->cv->wait(*barrier->mtx);
+        }
+        // We're in negative state, and increasing it for every thread
+        // which is done waiting in this round. If the state reaches
+        // 0 again, everybody who's already waiting for the next round
+        // can be woekn.
+        if (++barrier->state == 0) {
+            barrier->cv->wake_all(*barrier->mtx);
+        }
+        return 0;
     }
-    pthread_mutex_unlock(mtx);
-    return retval;
 }

 int pthread_barrier_destroy(pthread_barrier_t *barrier_opq)
@@ -111,16 +112,22 @@ int pthread_barrier_destroy(pthread_barrier_t *barrier_opq) static_assert(sizeof(pthread_barrier_t_int) <= sizeof(pthread_barrier_t), "pthread_barrier_t_int is larger than pthread_barrier_t");

-    if (!barrier || !barrier->ltch || !barrier->mtx) {
+    if (!barrier || !barrier->count || !barrier->mtx) {
         return EINVAL;
     }

-    delete barrier->ltch;
-    barrier->ltch = nullptr;
+ // One of threads, probably that which got PTHREAD_BARRIER_SERIAL_THREAD,
+    // might decide to delete the barrier while the other threads have not
+    // yet returned from their barrier_wait. So we may need to wait for
+    // those threads to complete before destroying the barrier:
+    WITH_LOCK(*barrier->mtx) {
+        while (barrier->state < 0) {
+            barrier->cv->wait(*barrier->mtx);
+        }
+    }

-    pthread_mutex_destroy(barrier->mtx);
     delete barrier->mtx;
+    delete barrier->cv;
     barrier->mtx = nullptr;
-
     return 0;
 }

--
You received this message because you are subscribed to the Google Groups "OSv 
Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
For more options, visit https://groups.google.com/d/optout.

Reply via email to