Author: Armin Rigo <[email protected]>
Branch: 
Changeset: r905:6fe6a5f23d8b
Date: 2014-03-01 10:32 +0100
http://bitbucket.org/pypy/stmgc/changeset/6fe6a5f23d8b/

Log:    Reintroduce mulitple condition variables, but this time in a more
        controlled fashion. In theory, let's say the code becomes clearer
        to follow and it's easier to check its correctness. We'll see in
        practice (done refactoring, some bugs left).

diff --git a/c7/stm/contention.c b/c7/stm/contention.c
--- a/c7/stm/contention.c
+++ b/c7/stm/contention.c
@@ -19,40 +19,32 @@
     if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
         /* I'm inevitable, so the other is not. */
         assert(other_pseg->transaction_state != TS_INEVITABLE);
-        other_pseg->transaction_state = TS_MUST_ABORT;
+        other_pseg->pub.nursery_end = NSE_SIGABORT;
     }
-    else if (other_pseg->start_time < STM_PSEGMENT->start_time) {
+    else if (other_pseg->start_time <= STM_PSEGMENT->start_time) {
         /* The other thread started before us, so I should abort, as I'm
            the least long-running transaction. */
     }
     else if (other_pseg->transaction_state == TS_REGULAR) {
         /* The other thread started strictly after us.  We tell it
            to abort if we can (e.g. if it's not TS_INEVITABLE). */
-        other_pseg->transaction_state = TS_MUST_ABORT;
+        other_pseg->pub.nursery_end = NSE_SIGABORT;
     }
 
-    if (other_pseg->transaction_state != TS_MUST_ABORT) {
-        /* if the other thread is not in aborting-soon mode, then we must
-           abort. */
+    /* Now check what we just did... almost: the check at the following
+       line can also find a NSE_SIGABORT that was set earlier.
+    */
+    if (other_pseg->pub.nursery_end != NSE_SIGABORT) {
+        /* if the other thread is not in aborting-soon mode, then *we*
+           must abort. */
         abort_with_mutex();
     }
-    else {
-        /* signal the other thread; it must abort.
-
-           Note that we know that the target thread is running now, and
-           so it is or will soon be blocked at a mutex_lock() or a
-           cond_wait(C_SAFE_POINT).  Thus broadcasting C_SAFE_POINT is
-           enough to wake it up in the second case.
-        */
-        cond_broadcast();
-    }
 }
 
 static void write_write_contention_management(uintptr_t lock_idx)
 {
-    mutex_lock();
-
-    if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
+    s_mutex_lock();
+    if (must_abort())
         abort_with_mutex();
 
     uint8_t prev_owner = ((volatile uint8_t *)write_locks)[lock_idx];
@@ -61,32 +53,60 @@
         uint8_t other_segment_num = prev_owner - 1;
         contention_management(other_segment_num);
 
-        /* the rest of this code is for the case where we continue to
-           run, and the other thread is asked to abort */
+        /* The rest of this code is for the case where we continue to
+           run.  We have to signal the other thread to abort, and wait
+           until it does. */
+
+        int sp = get_priv_segment(other_segment_num)->safe_point;
+        switch (sp) {
+
+        case SP_RUNNING:
+            /* The other thread is running now, so if we set
+               NSE_SIGABORT in 'nursery_end', it will soon enter a
+               mutex_lock() and thus abort.  Note that this line can
+               overwrite a NSE_SIGPAUSE, which is fine.
+            */
+            get_segment(other_segment_num)->nursery_end = NSE_SIGABORT;
+            break;
+
+        /* The other cases are where the other thread is at a
+           safe-point.  We wake it up by sending the correct signal.
+        */
+        case SP_WAIT_FOR_C_REQUEST_REMOVED:
+            cond_broadcast(C_REQUEST_REMOVED);
+            break;
+
+        case SP_WAIT_FOR_C_AT_SAFE_POINT:
+            cond_broadcast(C_AT_SAFE_POINT);
+            break;
 
 #ifdef STM_TESTS
-        /* abort anyway for tests. We mustn't call cond_wait() */
-        abort_with_mutex();
+        case SP_WAIT_FOR_OTHER_THREAD:
+            /* abort anyway for tests.  We can't wait here */
+            abort_with_mutex();
 #endif
 
-        /* first mark the other thread as "needing a safe-point" */
-        struct stm_priv_segment_info_s* other_pseg;
-        other_pseg = get_priv_segment(other_segment_num);
-        assert(other_pseg->transaction_state == TS_MUST_ABORT);
-        other_pseg->pub.nursery_end = NSE_SIGNAL;
+        default:
+            stm_fatalerror("unexpected other_pseg->safe_point: %d", sp);
+        }
 
         /* wait, hopefully until the other thread broadcasts "I'm
-           done aborting" (spurious wake-ups are ok). */
-        dprintf(("contention: wait C_SAFE_POINT...\n"));
-        cond_wait();
+           done aborting" (spurious wake-ups are ok).  Important:
+           this is not a safe point of any kind!  The shadowstack
+           is not correct here.  It should not end in a deadlock,
+           because the target thread is, in principle, guaranteed
+           to call abort_with_mutex().
+        */
+        dprintf(("contention: wait C_ABORTED...\n"));
+        cond_wait(C_ABORTED);
         dprintf(("contention: done\n"));
 
-        cond_broadcast();
+        if (must_abort())
+            abort_with_mutex();
 
         /* now we return into _stm_write_slowpath() and will try again
            to acquire the write lock on our object. */
-        assert(STM_PSEGMENT->safe_point == SP_RUNNING);
     }
 
-    mutex_unlock();
+    s_mutex_unlock();
 }
diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -149,7 +149,7 @@
 
 void _stm_start_transaction(stm_thread_local_t *tl, stm_jmpbuf_t *jmpbuf)
 {
-    mutex_lock_no_abort();
+    s_mutex_lock();
 
   retry:
     if (jmpbuf == NULL) {
@@ -171,12 +171,18 @@
 #endif
     STM_PSEGMENT->shadowstack_at_start_of_transaction = tl->shadowstack;
     STM_PSEGMENT->threadlocal_at_start_of_transaction = tl->thread_local_obj;
-    assert(STM_SEGMENT->nursery_end == NURSERY_END);
 
     dprintf(("start_transaction\n"));
 
-    mutex_unlock();
+    enter_safe_point_if_requested();
+    s_mutex_unlock();
 
+    /* Now running the SP_RUNNING start.  We can set our
+       'transaction_read_version' after releasing the mutex,
+       because it is only read by a concurrent thread in
+       stm_commit_transaction(), which waits until SP_RUNNING
+       threads are paused.
+    */
     uint8_t old_rv = STM_SEGMENT->transaction_read_version;
     STM_SEGMENT->transaction_read_version = old_rv + 1;
     if (UNLIKELY(old_rv == 0xff)) {
@@ -204,12 +210,8 @@
     char *remote_base = get_segment_base(remote_num);
     uint8_t remote_version = get_segment(remote_num)->transaction_read_version;
 
-    switch (get_priv_segment(remote_num)->transaction_state) {
-    case TS_NONE:
-    case TS_MUST_ABORT:
-        return;    /* no need to do any check */
-    default:;
-    }
+    if (get_priv_segment(remote_num)->transaction_state == TS_NONE)
+        return;    /* no need to check */
 
     LIST_FOREACH_R(
         STM_PSEGMENT->modified_old_objects,
@@ -219,10 +221,9 @@
                 /* A write-read conflict! */
                 contention_management(remote_num);
 
-                /* If we reach this point, it means we aborted the other
-                   thread.  We're done here. */
-                assert(get_priv_segment(remote_num)->transaction_state ==
-                       TS_MUST_ABORT);
+                /* If we reach this point, it means that we would like
+                   the other thread to abort.  We're done here. */
+                assert(get_segment(remote_num)->nursery_end == NSE_SIGABORT);
                 return;
             }
         }));
@@ -288,8 +289,8 @@
     char *local_base = STM_SEGMENT->segment_base;
     char *remote_base = get_segment_base(remote_num);
     bool remote_active =
-        (get_priv_segment(remote_num)->transaction_state == TS_REGULAR ||
-         get_priv_segment(remote_num)->transaction_state == TS_INEVITABLE);
+        (get_priv_segment(remote_num)->transaction_state != TS_NONE &&
+         get_segment(remote_num)->nursery_end != NSE_SIGABORT);
 
     LIST_FOREACH_R(
         STM_PSEGMENT->modified_old_objects,
@@ -333,9 +334,6 @@
     stm_thread_local_t *tl = STM_SEGMENT->running_thread;
     release_thread_segment(tl);
     /* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
-
-    /* wake up other threads waiting. */
-    cond_broadcast();
 }
 
 void stm_commit_transaction(void)
@@ -349,22 +347,12 @@
 
     minor_collection(/*commit=*/ true);
 
-    mutex_lock();
+    s_mutex_lock();
 
- retry:
-    if (STM_SEGMENT->nursery_end != NURSERY_END)
-        collectable_safe_point();
-
-    STM_PSEGMENT->safe_point = SP_SAFE_POINT;
-
-    /* wait until the other thread is at a safe-point */
-    if (!try_wait_for_other_safe_points()) {
-        STM_PSEGMENT->safe_point = SP_RUNNING;
-        goto retry;
-    }
-
-    /* the rest of this function either runs atomically without
-       releasing the mutex, or aborts the current thread. */
+    /* force all other threads to be paused.  They will unpause
+       automatically when we are done here, i.e. at mutex_unlock().
+       Important: we should not call cond_wait() in the meantime. */
+    synchronize_all_threads();
 
     /* detect conflicts */
     detect_write_read_conflicts();
@@ -373,7 +361,6 @@
     dprintf(("commit_transaction\n"));
 
     assert(STM_SEGMENT->nursery_end == NURSERY_END);
-    assert(STM_PSEGMENT->transaction_state != TS_MUST_ABORT);
     STM_SEGMENT->jmpbuf_ptr = NULL;
 
     /* if a major collection is required, do it here */
@@ -393,17 +380,22 @@
         STM_PSEGMENT->overflow_number = highest_overflow_number;
     }
 
+    /* send what is hopefully the correct signals */
+    if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+        /* wake up one thread in wait_for_end_of_inevitable_transaction() */
+        cond_signal(C_INEVITABLE);
+    }
+
     /* done */
     _finish_transaction();
+    /* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
 
-    assert(STM_SEGMENT->nursery_end == NURSERY_END);
-
-    mutex_unlock();
+    s_mutex_unlock();
 }
 
 void stm_abort_transaction(void)
 {
-    mutex_lock();
+    s_mutex_lock();
     abort_with_mutex();
 }
 
@@ -457,12 +449,12 @@
 
     switch (STM_PSEGMENT->transaction_state) {
     case TS_REGULAR:
-    case TS_MUST_ABORT:
         break;
     case TS_INEVITABLE:
-        assert(!"abort: transaction_state == TS_INEVITABLE");
+        stm_fatalerror("abort: transaction_state == TS_INEVITABLE");
     default:
-        assert(!"abort: bad transaction_state");
+        stm_fatalerror("abort: bad transaction_state == %d",
+                       (int)STM_PSEGMENT->transaction_state);
     }
     assert(STM_PSEGMENT->running_pthread == pthread_self());
 
@@ -478,10 +470,12 @@
     tl->thread_local_obj = STM_PSEGMENT->threadlocal_at_start_of_transaction;
 
     _finish_transaction();
+    /* cannot access STM_SEGMENT or STM_PSEGMENT from here ! */
 
-    STM_SEGMENT->nursery_end = NURSERY_END;
+    /* Broadcast C_ABORTED to wake up contention.c */
+    cond_broadcast(C_ABORTED);
 
-    mutex_unlock();
+    s_mutex_unlock();
 
     /* It seems to be a good idea, at least in some examples, to sleep
        one microsecond here before retrying.  Otherwise, what was
@@ -502,25 +496,16 @@
 
 void _stm_become_inevitable(const char *msg)
 {
-    mutex_lock();
-    switch (STM_PSEGMENT->transaction_state) {
+    s_mutex_lock();
+    enter_safe_point_if_requested();
 
-    case TS_INEVITABLE:
-        break;   /* already good */
+    if (STM_PSEGMENT->transaction_state == TS_REGULAR) {
+        dprintf(("become_inevitable: %s\n", msg));
 
-    case TS_REGULAR:
-        /* become inevitable */
         wait_for_end_of_inevitable_transaction(true);
         STM_PSEGMENT->transaction_state = TS_INEVITABLE;
         STM_SEGMENT->jmpbuf_ptr = NULL;
-        break;
+    }
 
-    case TS_MUST_ABORT:
-        abort_with_mutex();
-
-    default:
-        assert(!"invalid transaction_state in become_inevitable");
-    }
-    dprintf(("become_inevitable: %s\n", msg));
-    mutex_unlock();
+    s_mutex_unlock();
 }
diff --git a/c7/stm/core.h b/c7/stm/core.h
--- a/c7/stm/core.h
+++ b/c7/stm/core.h
@@ -104,10 +104,8 @@
 
     /* The thread's safe-point state, one of the SP_xxx constants.  The
        thread is in a "safe point" if it is not concurrently doing any
-       change that might cause race conditions in other threads.  A
-       thread may enter but not *leave* the safe point it is in without
-       getting hold of the mutex.  Broadly speaking, any value other
-       than SP_RUNNING means a safe point of some kind. */
+       read or change in this data structure that might cause race
+       conditions in other threads. */
     uint8_t safe_point;
 
     /* The transaction status, one of the TS_xxx constants.  This is
@@ -131,13 +129,16 @@
 enum /* safe_point */ {
     SP_NO_TRANSACTION=0,
     SP_RUNNING,
-    SP_SAFE_POINT,
+    SP_WAIT_FOR_C_REQUEST_REMOVED,
+    SP_WAIT_FOR_C_AT_SAFE_POINT,
+#ifdef STM_TESTS
+    SP_WAIT_FOR_OTHER_THREAD,
+#endif
 };
 enum /* transaction_state */ {
     TS_NONE=0,
     TS_REGULAR,
     TS_INEVITABLE,
-    TS_MUST_ABORT,
 };
 
 static char *stm_object_pages;
@@ -192,18 +193,4 @@
     asm("/* workaround for llvm bug */");
 }
 
-static inline void abort_if_needed(void) {
-    switch (STM_PSEGMENT->transaction_state) {
-    case TS_REGULAR:
-    case TS_INEVITABLE:
-        break;
-
-    case TS_MUST_ABORT:
-        stm_abort_transaction();
-
-    default:
-        assert(!"commit: bad transaction_state");
-    }
-}
-
 static void synchronize_overflow_object_now(object_t *obj);
diff --git a/c7/stm/gcpage.c b/c7/stm/gcpage.c
--- a/c7/stm/gcpage.c
+++ b/c7/stm/gcpage.c
@@ -116,24 +116,18 @@
     if (!is_major_collection_requested())
         return;
 
-    mutex_lock();
+    s_mutex_lock();
 
-    assert(STM_PSEGMENT->safe_point == SP_RUNNING);
-    STM_PSEGMENT->safe_point = SP_SAFE_POINT;
+    if (is_major_collection_requested()) {   /* if still true */
 
-    while (is_major_collection_requested()) {
-        /* wait until the other thread is at a safe-point */
-        if (try_wait_for_other_safe_points()) {
-            /* ok */
+        synchronize_all_threads();
+
+        if (is_major_collection_requested()) {   /* if *still* true */
             major_collection_now_at_safe_point();
-            break;
         }
     }
 
-    assert(STM_PSEGMENT->safe_point == SP_SAFE_POINT);
-    STM_PSEGMENT->safe_point = SP_RUNNING;
-
-    mutex_unlock();
+    s_mutex_unlock();
 }
 
 static void major_collection_now_at_safe_point(void)
diff --git a/c7/stm/nursery.c b/c7/stm/nursery.c
--- a/c7/stm/nursery.c
+++ b/c7/stm/nursery.c
@@ -283,7 +283,6 @@
     assert(!_has_mutex());
 
     stm_safe_point();
-    abort_if_needed();
 
     _do_minor_collection(commit);
 }
@@ -384,7 +383,7 @@
             continue;
 
         assert(pseg->transaction_state != TS_NONE);
-        assert(pseg->safe_point == SP_SAFE_POINT);
+        assert(pseg->safe_point != SP_RUNNING);
 
         set_gs_register(get_segment_base(i));
         _do_minor_collection(/*commit=*/ false);
diff --git a/c7/stm/nursery.h b/c7/stm/nursery.h
--- a/c7/stm/nursery.h
+++ b/c7/stm/nursery.h
@@ -1,6 +1,10 @@
 
-/* '_stm_nursery_section_end' is either NURSERY_END or NSE_SIGNAL */
-#define NSE_SIGNAL     _STM_NSE_SIGNAL
+/* '_stm_nursery_section_end' is either NURSERY_END or NSE_SIGxxx */
+#define NSE_SIGPAUSE   0
+#define NSE_SIGABORT   1
+#if     NSE_SIGABORT > _STM_NSE_SIGNAL_MAX
+#  error "update _STM_NSE_SIGNAL_MAX"
+#endif
 
 
 static uint32_t highest_overflow_number;
@@ -9,3 +13,7 @@
 static void check_nursery_at_transaction_start(void);
 static void throw_away_nursery(void);
 static void major_do_minor_collections(void);
+
+static inline bool must_abort(void) {
+    return STM_SEGMENT->nursery_end == NSE_SIGABORT;
+}
diff --git a/c7/stm/sync.c b/c7/stm/sync.c
--- a/c7/stm/sync.c
+++ b/c7/stm/sync.c
@@ -11,23 +11,24 @@
 
    - SP_RUNNING: a thread is running a transaction using this segment.
 
-   - SP_SAFE_POINT: the thread that owns this segment is currently
+   - SP_WAIT_FOR_xxx: the thread that owns this segment is currently
      suspended in a safe-point.  (A safe-point means that it is not
      changing anything right now, and the current shadowstack is correct.)
 
-   Synchronization is done with a single mutex / condition variable.  A
-   thread needs to have acquired the mutex in order to do things like
-   acquiring or releasing ownership of a segment or updating this
-   segment's state.  No other thread can acquire the mutex concurrently,
-   and so there is no race: the (single) thread owning the mutex can
-   freely inspect or even change the state of other segments too.
+   Synchronization is done with a single mutex and a few condition
+   variables.  A thread needs to have acquired the mutex in order to do
+   things like acquiring or releasing ownership of a segment or updating
+   this segment's state.  No other thread can acquire the mutex
+   concurrently, and so there is no race: the (single) thread owning the
+   mutex can freely inspect or even change the state of other segments
+   too.
 */
 
 
 static union {
     struct {
         pthread_mutex_t global_mutex;
-        pthread_cond_t global_cond;
+        pthread_cond_t cond[_C_TOTAL];
         /* some additional pieces of global state follow */
         uint8_t in_use[NB_SEGMENTS];   /* 1 if running a pthread */
         uint64_t global_time;
@@ -41,8 +42,11 @@
     if (pthread_mutex_init(&sync_ctl.global_mutex, NULL) != 0)
         stm_fatalerror("mutex initialization: %m\n");
 
-    if (pthread_cond_init(&sync_ctl.global_cond, NULL) != 0)
-        stm_fatalerror("cond initialization: %m\n");
+    long i;
+    for (i = 0; i < _C_TOTAL; i++) {
+        if (pthread_cond_init(&sync_ctl.cond[i], NULL) != 0)
+            stm_fatalerror("cond initialization: %m\n");
+    }
 }
 
 static void teardown_sync(void)
@@ -50,8 +54,11 @@
     if (pthread_mutex_destroy(&sync_ctl.global_mutex) != 0)
         stm_fatalerror("mutex destroy: %m\n");
 
-    if (pthread_cond_destroy(&sync_ctl.global_cond) != 0)
-        stm_fatalerror("cond destroy: %m\n");
+    long i;
+    for (i = 0; i < _C_TOTAL; i++) {
+        if (pthread_cond_destroy(&sync_ctl.cond[i]) != 0)
+            stm_fatalerror("cond destroy: %m\n");
+    }
 
     memset(&sync_ctl, 0, sizeof(sync_ctl.in_use));
 }
@@ -70,7 +77,7 @@
         stm_fatalerror("syscall(arch_prctl, ARCH_SET_GS): %m\n");
 }
 
-static inline void mutex_lock_no_abort(void)
+static inline void s_mutex_lock(void)
 {
     assert(!_has_mutex_here);
     if (UNLIKELY(pthread_mutex_lock(&sync_ctl.global_mutex) != 0))
@@ -78,47 +85,59 @@
     assert((_has_mutex_here = true, 1));
 }
 
-static inline void mutex_lock(void)
+static inline void s_mutex_unlock(void)
 {
-    mutex_lock_no_abort();
-    if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
-        abort_with_mutex();
-}
-
-static inline void mutex_unlock(void)
-{
-    assert(STM_PSEGMENT->safe_point == SP_NO_TRANSACTION ||
-           STM_PSEGMENT->safe_point == SP_RUNNING);
-
     assert(_has_mutex_here);
     if (UNLIKELY(pthread_mutex_unlock(&sync_ctl.global_mutex) != 0))
         stm_fatalerror("pthread_mutex_unlock: %m\n");
     assert((_has_mutex_here = false, 1));
 }
 
-static inline void cond_wait_no_abort(void)
+static inline void cond_wait(enum cond_type_e ctype)
 {
 #ifdef STM_NO_COND_WAIT
-    stm_fatalerror("*** cond_wait called!\n");
+    stm_fatalerror("*** cond_wait/%d called!\n", (int)ctype);
 #endif
 
     assert(_has_mutex_here);
-    if (UNLIKELY(pthread_cond_wait(&sync_ctl.global_cond,
+    if (UNLIKELY(pthread_cond_wait(&sync_ctl.cond[ctype],
                                    &sync_ctl.global_mutex) != 0))
-        stm_fatalerror("pthread_cond_wait: %m\n");
+        stm_fatalerror("pthread_cond_wait/%d: %m\n", (int)ctype);
 }
 
-static inline void cond_wait(void)
+static inline void cond_signal(enum cond_type_e ctype)
 {
-    cond_wait_no_abort();
-    if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
-        abort_with_mutex();
+    if (UNLIKELY(pthread_cond_signal(&sync_ctl.cond[ctype]) != 0))
+        stm_fatalerror("pthread_cond_signal/%d: %m\n", (int)ctype);
 }
 
-static inline void cond_broadcast(void)
+static inline void cond_broadcast(enum cond_type_e ctype)
 {
-    if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.global_cond) != 0))
-        stm_fatalerror("pthread_cond_broadcast: %m\n");
+    if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.cond[ctype]) != 0))
+        stm_fatalerror("pthread_cond_broadcast/%d: %m\n", (int)ctype);
+}
+
+/************************************************************/
+
+
+static void wait_for_end_of_inevitable_transaction(bool can_abort)
+{
+    long i;
+ restart:
+    for (i = 0; i < NB_SEGMENTS; i++) {
+        if (get_priv_segment(i)->transaction_state == TS_INEVITABLE) {
+            if (can_abort) {
+                /* for now, always abort if we can.  We could also try
+                   sometimes to wait for the other thread (needs to
+                   take care about setting safe_point then) */
+                abort_with_mutex();
+            }
+            /* wait for stm_commit_transaction() to finish this
+               inevitable transaction */
+            cond_wait(C_INEVITABLE);
+            goto restart;
+        }
+    }
 }
 
 static bool acquire_thread_segment(stm_thread_local_t *tl)
@@ -151,10 +170,9 @@
             goto got_num;
         }
     }
-    /* Wait and retry.  It is guaranteed that any thread releasing its
-       segment will do so by acquiring the mutex and calling
-       cond_broadcast(). */
-    cond_wait_no_abort();
+    /* No segment available.  Wait until release_thread_segment()
+       signals that one segment has been freed. */
+    cond_wait(C_SEGMENT_FREE);
 
     /* Return false to the caller, which will call us again */
     return false;
@@ -177,28 +195,9 @@
 
     assert(sync_ctl.in_use[tl->associated_segment_num] == 1);
     sync_ctl.in_use[tl->associated_segment_num] = 0;
-}
 
-static void wait_for_end_of_inevitable_transaction(bool can_abort)
-{
-    assert(_has_mutex());
-
-    long i;
-  restart:
-    for (i = 0; i < NB_SEGMENTS; i++) {
-        if (get_priv_segment(i)->transaction_state == TS_INEVITABLE) {
-            if (can_abort) {
-                /* XXX should we wait here?  or abort?  or a mix?
-                   for now, always abort */
-                abort_with_mutex();
-                //cond_wait();
-            }
-            else {
-                cond_wait_no_abort();
-            }
-            goto restart;
-        }
-    }
+    /* wake up one of the threads waiting in acquire_thread_segment() */
+    cond_signal(C_SEGMENT_FREE);
 }
 
 static bool _running_transaction(void) __attribute__((unused));
@@ -225,115 +224,130 @@
 void _stm_start_safe_point(void)
 {
     assert(STM_PSEGMENT->safe_point == SP_RUNNING);
-    STM_PSEGMENT->safe_point = SP_SAFE_POINT;
+    STM_PSEGMENT->safe_point = SP_WAIT_FOR_OTHER_THREAD;
 }
 
 void _stm_stop_safe_point(void)
 {
-    assert(STM_PSEGMENT->safe_point == SP_SAFE_POINT);
+    assert(STM_PSEGMENT->safe_point == SP_WAIT_FOR_OTHER_THREAD);
     STM_PSEGMENT->safe_point = SP_RUNNING;
 
-    if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
-        stm_abort_transaction();
+    stm_safe_point();
 }
 #endif
 
 
-static bool try_wait_for_other_safe_points(void)
+/************************************************************/
+
+
+#ifndef NDEBUG
+static bool _safe_points_requested = false;
+#endif
+
+static void signal_everybody_to_pause_running(void)
 {
-    /* Must be called with the mutex.  When all other threads are in a
-       safe point of at least the requested kind, returns.  Otherwise,
-       asks them to enter a safe point, issues a cond_wait(), and wait.
-
-       When this function returns, the other threads are all blocked at
-       safe points as requested.  They may be either in their own
-       cond_wait(), or running at SP_NO_TRANSACTION, in which case they
-       should not do anything related to stm until the next time they
-       call mutex_lock().
-
-       The next time we unlock the mutex (with mutex_unlock() or
-       cond_wait()), they will proceed.
-
-       This function requires that the calling thread is in a safe-point
-       right now, so there is no deadlock if one thread calls
-       try_wait_for_other_safe_points() while another is currently blocked
-       in the cond_wait() in this same function.
-    */
-
-    assert(_has_mutex());
-    assert(STM_PSEGMENT->safe_point == SP_SAFE_POINT);
-
-    if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
-        abort_with_mutex();
+    assert(_safe_points_requested == false);
+    assert((_safe_points_requested = true, 1));
 
     long i;
-    bool wait = false;
     for (i = 0; i < NB_SEGMENTS; i++) {
-        /* If the other thread is SP_NO_TRANSACTION, then it can be
-           ignored here: as long as we have the mutex, it will remain
-           SP_NO_TRANSACTION.  If it is already at a suitable safe point,
-           it must be in a cond_wait(), so it will not resume as long
-           as we hold the mutex.  Thus the only cases is if it is
-           SP_RUNNING, or at the wrong kind of safe point.
-        */
-        struct stm_priv_segment_info_s *other_pseg = get_priv_segment(i);
-        if (other_pseg->safe_point == SP_RUNNING) {
-            /* we need to wait for this thread.  Use NSE_SIGNAL to ask
-               it to enter a safe-point soon. */
-            other_pseg->pub.nursery_end = NSE_SIGNAL;
-            wait = true;
+        if (get_segment(i)->nursery_end == NURSERY_END)
+            get_segment(i)->nursery_end = NSE_SIGPAUSE;
+    }
+}
+
+static inline long count_other_threads_sp_running(void)
+{
+    /* Return the number of other threads in SP_RUNNING.
+       Asserts that SP_RUNNING threads still have the NSE_SIGxxx. */
+    long i;
+    long result = 0;
+    int my_num = STM_SEGMENT->segment_num;
+
+    for (i = 0; i < NB_SEGMENTS; i++) {
+        if (i != my_num && get_priv_segment(i)->safe_point == SP_RUNNING) {
+            assert(get_segment(i)->nursery_end <= _STM_NSE_SIGNAL_MAX);
+            result++;
+        }
+    }
+    return result;
+}
+
+static void remove_requests_for_safe_point(void)
+{
+    assert(_safe_points_requested == true);
+    assert((_safe_points_requested = false, 1));
+
+    long i;
+    for (i = 0; i < NB_SEGMENTS; i++) {
+        assert(get_segment(i)->nursery_end != NURSERY_END);
+        if (get_segment(i)->nursery_end == NSE_SIGPAUSE)
+            get_segment(i)->nursery_end = NURSERY_END;
+    }
+    cond_broadcast(C_REQUEST_REMOVED);
+}
+
+static void enter_safe_point_if_requested(void)
+{
+    assert(_has_mutex());
+    while (1) {
+        if (must_abort())
+            abort_with_mutex();
+
+        if (STM_SEGMENT->nursery_end == NURSERY_END)
+            break;    /* no safe point requested */
+
+        assert(STM_SEGMENT->nursery_end == NSE_SIGPAUSE);
+
+        /* If we are requested to enter a safe-point, we cannot proceed now.
+           Wait until the safe-point request is removed for us. */
+
+        cond_signal(C_AT_SAFE_POINT);
+        STM_PSEGMENT->safe_point = SP_WAIT_FOR_C_REQUEST_REMOVED;
+        cond_wait(C_REQUEST_REMOVED);
+        STM_PSEGMENT->safe_point = SP_RUNNING;
+    }
+}
+
+static void synchronize_all_threads(void)
+{
+    enter_safe_point_if_requested();
+
+    /* Only one thread should reach this point concurrently.  This is
+       why: if several threads call this function, the first one that
+       goes past this point will set the "request safe point" on all
+       other threads; then none of the other threads will go past the
+       enter_safe_point_if_requested() above. */
+    signal_everybody_to_pause_running();
+
+    /* If some other threads are SP_RUNNING, we cannot proceed now.
+       Wait until all other threads are suspended. */
+    while (count_other_threads_sp_running() > 0) {
+
+        STM_PSEGMENT->safe_point = SP_WAIT_FOR_C_AT_SAFE_POINT;
+        cond_wait(C_AT_SAFE_POINT);
+        STM_PSEGMENT->safe_point = SP_RUNNING;
+
+        if (must_abort()) {
+            remove_requests_for_safe_point();    /* => C_REQUEST_REMOVED */
+            abort_with_mutex();
         }
     }
 
-    if (wait) {
-        cond_wait();
-        /* XXX think: I believe this can end in a busy-loop, with this thread
-           setting NSE_SIGNAL on the other thread; then the other thread
-           commits, sends C_SAFE_POINT, finish the transaction, start
-           the next one, and only then this thread resumes; then we're back
-           in the same situation as before with no progress here.
-        */
-        return false;
-    }
-
-    /* all threads are at a safe-point now.  Broadcast C_RESUME, which
-       will allow them to resume --- but only when we release the mutex. */
-    cond_broadcast();
-    return true;
+    /* Remove the requests for safe-points now.  In principle we should
+       remove it later, when the caller is done, but this is equivalent
+       as long as we hold the mutex.
+    */
+    remove_requests_for_safe_point();    /* => C_REQUEST_REMOVED */
 }
 
 void _stm_collectable_safe_point(void)
 {
-    /* If _stm_nursery_end was set to NSE_SIGNAL by another thread,
+    /* If 'nursery_end' was set to NSE_SIGxxx by another thread,
        we end up here as soon as we try to call stm_allocate() or do
        a call to stm_safe_point().
-
-       This works together with wait_for_other_safe_points() to
-       signal the C_SAFE_POINT condition.
     */
-    mutex_lock();
-    collectable_safe_point();
-    mutex_unlock();
+    s_mutex_lock();
+    enter_safe_point_if_requested();
+    s_mutex_unlock();
 }
-
-static void collectable_safe_point(void)
-{
-    assert(_has_mutex());
-    assert(STM_PSEGMENT->safe_point == SP_RUNNING);
-
-    while (STM_SEGMENT->nursery_end == NSE_SIGNAL) {
-        dprintf(("collectable_safe_point...\n"));
-        STM_PSEGMENT->safe_point = SP_SAFE_POINT;
-        STM_SEGMENT->nursery_end = NURSERY_END;
-
-        /* signal all the threads blocked in
-           wait_for_other_safe_points() */
-        cond_broadcast();
-
-        cond_wait();
-
-        STM_PSEGMENT->safe_point = SP_RUNNING;
-    }
-    assert(STM_SEGMENT->nursery_end == NURSERY_END);
-    dprintf(("collectable_safe_point done\n"));
-}
diff --git a/c7/stm/sync.h b/c7/stm/sync.h
--- a/c7/stm/sync.h
+++ b/c7/stm/sync.h
@@ -3,11 +3,20 @@
 static void setup_sync(void);
 static void teardown_sync(void);
 
-/* all synchronization is done via a mutex and a condition variable */
-static void mutex_lock(void);
-static void mutex_unlock(void);
-static void cond_wait(void);
-static void cond_broadcast(void);
+/* all synchronization is done via a mutex and a few condition variables */
+enum cond_type_e {
+    C_SEGMENT_FREE,
+    C_AT_SAFE_POINT,
+    C_REQUEST_REMOVED,
+    C_INEVITABLE,
+    C_ABORTED,
+    _C_TOTAL
+};
+static void s_mutex_lock(void);
+static void s_mutex_unlock(void);
+static void cond_wait(enum cond_type_e);
+static void cond_signal(enum cond_type_e);
+static void cond_broadcast(enum cond_type_e);
 #ifndef NDEBUG
 static bool _has_mutex(void);
 #endif
@@ -17,8 +26,6 @@
    (must have the mutex acquired!) */
 static bool acquire_thread_segment(stm_thread_local_t *tl);
 static void release_thread_segment(stm_thread_local_t *tl);
+
 static void wait_for_end_of_inevitable_transaction(bool can_abort);
-
-/* see the source for an exact description */
-static bool try_wait_for_other_safe_points(void);
-static void collectable_safe_point(void);
+static void synchronize_all_threads(void);
diff --git a/c7/stmgc.h b/c7/stmgc.h
--- a/c7/stmgc.h
+++ b/c7/stmgc.h
@@ -96,7 +96,7 @@
 #endif
 
 #define _STM_GCFLAG_WRITE_BARRIER      0x01
-#define _STM_NSE_SIGNAL                   0
+#define _STM_NSE_SIGNAL_MAX               1
 #define _STM_FAST_ALLOC           (66*1024)
 #define STM_FLAGS_PREBUILT   _STM_GCFLAG_WRITE_BARRIER
 
@@ -244,7 +244,7 @@
 /* Forces a safe-point if needed.  Normally not needed: this is
    automatic if you call stm_allocate(). */
 static inline void stm_safe_point(void) {
-    if (STM_SEGMENT->nursery_end == _STM_NSE_SIGNAL)
+    if (STM_SEGMENT->nursery_end <= _STM_NSE_SIGNAL_MAX)
         _stm_collectable_safe_point();
 }
 
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to