Author: Armin Rigo <[email protected]>
Branch: c7-refactor
Changeset: r845:bf3c8f9d6f21
Date: 2014-02-24 23:46 +0100
http://bitbucket.org/pypy/stmgc/changeset/bf3c8f9d6f21/

Log:    Phew. I *think* that by now this model with several condition
        variables should, ideally, work.

diff --git a/c7/demo/Makefile b/c7/demo/Makefile
--- a/c7/demo/Makefile
+++ b/c7/demo/Makefile
@@ -24,7 +24,7 @@
         -Wall -Werror ../stmgc.c
 
 build-%: %.c ${H_FILES} ${C_FILES}
-       clang -I.. -pthread  -g -O1  $< -o build-$* \
+       clang -I.. -pthread  -g -O0  $< -o build-$* \
         -Wall -Werror ../stmgc.c
 
 release-%: %.c ${H_FILES} ${C_FILES}
diff --git a/c7/demo/demo2.c b/c7/demo/demo2.c
--- a/c7/demo/demo2.c
+++ b/c7/demo/demo2.c
@@ -6,8 +6,8 @@
 
 #include "stmgc.h"
 
-#define LIST_LENGTH 6000
-#define BUNCH       400
+#define LIST_LENGTH 2000
+#define BUNCH       100
 
 typedef TLPREFIX struct node_s node_t;
 typedef node_t* nodeptr_t;
@@ -34,7 +34,7 @@
 
 void done_shadow_stack(void)
 {
-    free(stm_thread_local.shadowstack);
+    free(stm_thread_local.shadowstack_base);
     stm_thread_local.shadowstack = NULL;
     stm_thread_local.shadowstack_base = NULL;
 }
diff --git a/c7/stm/contention.c b/c7/stm/contention.c
--- a/c7/stm/contention.c
+++ b/c7/stm/contention.c
@@ -3,7 +3,7 @@
 #endif
 
 
-static void contention_management(uint8_t other_segment_num, bool wait)
+static void contention_management(uint8_t other_segment_num)
 {
     /* A simple contention manager.  Called when some other thread
        holds the write lock on an object.  The current thread tries
@@ -36,23 +36,58 @@
            abort. */
         abort_with_mutex();
     }
-    else if (wait) {
+    else {
+        /* signal the other thread; it must abort.
+
+           Note that we know that the target thread is running now, and
+           so it is or will soon be blocked at a mutex_lock() or a
+           cond_wait(C_SAFE_POINT).  Thus broadcasting C_SAFE_POINT is
+           enough to wake it up in the second case.
+        */
+        cond_broadcast(C_SAFE_POINT);
+    }
+}
+
+static void write_write_contention_management(uintptr_t lock_idx)
+{
+    mutex_lock();
+
+    if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
+        abort_with_mutex();
+
+    uint8_t prev_owner = ((volatile uint8_t *)write_locks)[lock_idx];
+    if (prev_owner != 0 && prev_owner != STM_PSEGMENT->write_lock_num) {
+
+        uint8_t other_segment_num = prev_owner - 1;
+        contention_management(other_segment_num);
+
+        /* the rest of this code is for the case where we continue to
+           run, and the other thread is asked to abort */
+
 #ifdef STM_TESTS
         /* abort anyway for tests. We mustn't call cond_wait() */
         abort_with_mutex();
 #endif
-        /* otherwise, we will issue a safe point and wait: */
+
+        /* first mark the other thread as "needing a safe-point" */
+        struct stm_priv_segment_info_s* other_pseg;
+        other_pseg = get_priv_segment(other_segment_num);
+        assert(other_pseg->transaction_state == TS_MUST_ABORT);
+        other_pseg->pub.nursery_end = NSE_SIGNAL;
+
+        /* we will issue a safe point and wait: */
         STM_PSEGMENT->safe_point = SP_SAFE_POINT_CANNOT_COLLECT;
 
-        /* signal the other thread; it must abort */
-        cond_broadcast();
+        /* wait, hopefully until the other thread broadcasts "I'm
+           done aborting" (spurious wake-ups are ok). */
+        cond_wait(C_SAFE_POINT);
 
-        /* then wait, hopefully until the other thread broadcasts "I'm
-           done aborting" (spurious wake-ups are ok) */
-        cond_wait();
+        cond_broadcast(C_RESUME);
 
         /* now we return into _stm_write_slowpath() and will try again
            to acquire the write lock on our object. */
         STM_PSEGMENT->safe_point = SP_RUNNING;
     }
+
+    mutex_unlock();
 }
diff --git a/c7/stm/contention.h b/c7/stm/contention.h
--- a/c7/stm/contention.h
+++ b/c7/stm/contention.h
@@ -1,2 +1,3 @@
 
-static void contention_management(uint8_t other_segment_num, bool wait);
+static void contention_management(uint8_t other_segment_num);
+static void write_write_contention_management(uintptr_t lock_idx);
diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -88,11 +88,7 @@
     else {
         /* call the contention manager, and then retry (unless we were
            aborted). */
-        mutex_lock();
-        uint8_t prev_owner = ((volatile uint8_t *)write_locks)[lock_idx];
-        if (prev_owner != 0 && prev_owner != lock_num)
-            contention_management(prev_owner - 1, true);
-        mutex_unlock();
+        write_write_contention_management(lock_idx);
         goto retry;
     }
 
@@ -156,6 +152,7 @@
                                                       : TS_INEVITABLE);
     STM_SEGMENT->jmpbuf_ptr = jmpbuf;
     STM_PSEGMENT->shadowstack_at_start_of_transaction = tl->shadowstack;
+    STM_SEGMENT->nursery_end = NURSERY_END;
 
     dprintf(("start_transaction\n"));
 
@@ -205,7 +202,7 @@
         ({
             if (was_read_remote(remote_base, item, remote_version)) {
                 /* A write-read conflict! */
-                contention_management(remote_num, false);
+                contention_management(remote_num);
 
                 /* If we reach this point, it means we aborted the other
                    thread.  We're done here. */
@@ -311,6 +308,12 @@
 
 static void _finish_transaction(void)
 {
+    /* signal all the threads blocked in wait_for_other_safe_points() */
+    if (STM_SEGMENT->nursery_end == NSE_SIGNAL) {
+        STM_SEGMENT->nursery_end = NURSERY_END;
+        cond_broadcast(C_SAFE_POINT);
+    }
+
     STM_PSEGMENT->safe_point = SP_NO_TRANSACTION;
     STM_PSEGMENT->transaction_state = TS_NONE;
 
@@ -339,8 +342,8 @@
     /* wait until the other thread is at a safe-point */
     wait_for_other_safe_points(SP_SAFE_POINT_CANNOT_COLLECT);
 
-    /* the rest of this function runs either atomically without releasing
-       the mutex, or it needs to restart. */
+    /* the rest of this function either runs atomically without
+       releasing the mutex, or aborts the current thread. */
 
     /* detect conflicts */
     detect_write_read_conflicts();
@@ -366,9 +369,9 @@
     /* done */
     _finish_transaction();
 
-    /* we did cond_broadcast() above already, in
-       try_wait_for_other_safe_points().  It may wake up
-       other threads in cond_wait() for a free segment. */
+    /* wake up one other thread waiting for a segment. */
+    cond_signal(C_RELEASE_THREAD_SEGMENT);
+
     mutex_unlock();
 }
 
@@ -446,7 +449,14 @@
 
     _finish_transaction();
 
-    cond_broadcast();
+    /* wake up one other thread waiting for a segment.  In order to support
+       contention.c, we use a broadcast, to make sure that all threads are
+       signalled, including the one that requested an abort, if any.
+       Moreover, we wake up any thread waiting for this one to do a safe
+       point, if any.
+    */
+    cond_broadcast(C_RELEASE_THREAD_SEGMENT);
+
     mutex_unlock();
 
     assert(jmpbuf_ptr != NULL);
diff --git a/c7/stm/nursery.c b/c7/stm/nursery.c
--- a/c7/stm/nursery.c
+++ b/c7/stm/nursery.c
@@ -16,7 +16,6 @@
 #define NURSERY_END           (NURSERY_START + NURSERY_SIZE)
 
 static uintptr_t _stm_nursery_start;
-uintptr_t _stm_nursery_end;
 
 
 /************************************************************/
@@ -25,11 +24,12 @@
 {
     assert(_STM_FAST_ALLOC <= NURSERY_SIZE);
     _stm_nursery_start = NURSERY_START;
-    _stm_nursery_end   = NURSERY_END;
 
     long i;
-    for (i = 0; i < NB_SEGMENTS; i++)
+    for (i = 0; i < NB_SEGMENTS; i++) {
         get_segment(i)->nursery_current = (stm_char *)NURSERY_START;
+        get_segment(i)->nursery_end = NURSERY_END;
+    }
 }
 
 static void teardown_nursery(void)
@@ -163,6 +163,8 @@
 static void minor_collection(bool commit)
 {
     assert(!_has_mutex());
+
+    stm_safe_point();
     abort_if_needed();
 
     /* We must move out of the nursery any object found within the
@@ -257,7 +259,7 @@
     assert((uintptr_t)STM_SEGMENT->nursery_current == _stm_nursery_start);
     uintptr_t i, limit;
 # ifdef STM_TESTS
-    limit = _stm_nursery_end - _stm_nursery_start;
+    limit = NURSERY_END - _stm_nursery_start;
 # else
     limit = 64;
 # endif
diff --git a/c7/stm/sync.c b/c7/stm/sync.c
--- a/c7/stm/sync.c
+++ b/c7/stm/sync.c
@@ -11,36 +11,43 @@
    additionally it wants to wait until the global state is changed by
    someone else, it waits on the condition variable.  This should be
    all we need for synchronization.
-
-   Maybe look at https://github.com/neosmart/pevents for how they do
-   WaitForMultipleObjects().
 */
 
 
 static union {
     struct {
         pthread_mutex_t global_mutex;
-        pthread_cond_t global_cond;
+        pthread_cond_t cond[_C_TOTAL];
         /* some additional pieces of global state follow */
         uint8_t in_use[NB_SEGMENTS];   /* 1 if running a pthread */
         uint64_t global_time;
     };
-    char reserved[128];
+    char reserved[192];
 } sync_ctl __attribute__((aligned(64)));
 
 
 static void setup_sync(void)
 {
-    if (pthread_mutex_init(&sync_ctl.global_mutex, NULL) != 0 ||
-         pthread_cond_init(&sync_ctl.global_cond, NULL) != 0)
-        stm_fatalerror("mutex/cond initialization: %m\n");
+    if (pthread_mutex_init(&sync_ctl.global_mutex, NULL) != 0)
+        stm_fatalerror("mutex initialization: %m\n");
+
+    long i;
+    for (i = 0; i < _C_TOTAL; i++) {
+        if (pthread_cond_init(&sync_ctl.cond[i], NULL) != 0)
+            stm_fatalerror("cond initialization: %m\n");
+    }
 }
 
 static void teardown_sync(void)
 {
-    if (pthread_mutex_destroy(&sync_ctl.global_mutex) != 0 ||
-         pthread_cond_destroy(&sync_ctl.global_cond) != 0)
-        stm_fatalerror("mutex/cond destroy: %m\n");
+    if (pthread_mutex_destroy(&sync_ctl.global_mutex) != 0)
+        stm_fatalerror("mutex destroy: %m\n");
+
+    long i;
+    for (i = 0; i < _C_TOTAL; i++) {
+        if (pthread_cond_destroy(&sync_ctl.cond[i]) != 0)
+            stm_fatalerror("cond destroy: %m\n");
+    }
 
     memset(&sync_ctl, 0, sizeof(sync_ctl.in_use));
 }
@@ -81,26 +88,31 @@
     assert((_has_mutex_here = false, 1));
 }
 
-static inline void cond_wait(void)
+static inline void cond_wait(enum cond_type_e ctype)
 {
 #ifdef STM_NO_COND_WAIT
-    fprintf(stderr, "*** cond_wait called!");
-    abort();
+    stm_fatalerror("*** cond_wait/%d called!\n", (int)ctype);
 #endif
 
     assert(_has_mutex_here);
-    if (UNLIKELY(pthread_cond_wait(&sync_ctl.global_cond,
+    if (UNLIKELY(pthread_cond_wait(&sync_ctl.cond[ctype],
                                    &sync_ctl.global_mutex) != 0))
-        stm_fatalerror("pthread_cond_wait: %m\n");
+        stm_fatalerror("pthread_cond_wait/%d: %m\n", (int)ctype);
 
     if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
         abort_with_mutex();
 }
 
-static inline void cond_broadcast(void)
+static inline void cond_broadcast(enum cond_type_e ctype)
 {
-    if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.global_cond) != 0))
-        stm_fatalerror("pthread_cond_broadcast: %m\n");
+    if (UNLIKELY(pthread_cond_broadcast(&sync_ctl.cond[ctype]) != 0))
+        stm_fatalerror("pthread_cond_broadcast/%d: %m\n", (int)ctype);
+}
+
+static inline void cond_signal(enum cond_type_e ctype)
+{
+    if (UNLIKELY(pthread_cond_signal(&sync_ctl.cond[ctype]) != 0))
+        stm_fatalerror("pthread_cond_signal/%d: %m\n", (int)ctype);
 }
 
 static void acquire_thread_segment(stm_thread_local_t *tl)
@@ -134,8 +146,10 @@
             goto got_num;
         }
     }
-    /* Wait and retry */
-    cond_wait();
+    /* Wait and retry.  It is guaranteed that any thread releasing its
+       segment will do so by acquiring the mutex and calling
+       cond_signal(C_RELEASE_THREAD_SEGMENT). */
+    cond_wait(C_RELEASE_THREAD_SEGMENT);
     goto retry;
 
  got_num:
@@ -216,11 +230,18 @@
        in the cond_wait() in this same function.
     */
 
+    /* XXX review what occurs for the other kind! */
+           assert(requested_safe_point_kind == SP_SAFE_POINT_CANNOT_COLLECT);
+
  restart:
     assert(_has_mutex());
     assert(STM_PSEGMENT->safe_point == SP_SAFE_POINT_CAN_COLLECT);
 
+    if (STM_PSEGMENT->transaction_state == TS_MUST_ABORT)
+        abort_with_mutex();
+
     long i;
+    bool wait = false;
     for (i = 0; i < NB_SEGMENTS; i++) {
         /* If the other thread is SP_NO_TRANSACTION, then it can be
            ignored here: as long as we have the mutex, it will remain
@@ -237,36 +258,49 @@
             /* we need to wait for this thread.  Use NSE_SIGNAL to ask
                it (and possibly all other threads in the same case) to
                enter a safe-point soon. */
-            _stm_nursery_end = NSE_SIGNAL;
-            cond_wait();
-            goto restart;
+            other_pseg->pub.nursery_end = NSE_SIGNAL;
+            wait = true;
         }
     }
 
-    /* all threads are at a safe-point now. */
-    cond_broadcast();   /* to wake up the other threads, but later,
-                           when they get the mutex again */
+    if (wait) {
+        cond_wait(C_SAFE_POINT);
+        goto restart;
+    }
+
+    /* all threads are at a safe-point now.  Broadcast C_RESUME, which
+       will allow them to resume --- but only when we release the mutex. */
+    cond_broadcast(C_RESUME);
 }
 
 void _stm_collectable_safe_point(void)
 {
-    /* If nursery_section_end was set to NSE_SIGNAL by another thread,
+    /* If _stm_nursery_end was set to NSE_SIGNAL by another thread,
        we end up here as soon as we try to call stm_allocate() or do
        a call to stm_safe_point().
-       See wait_for_other_safe_points() for details.
+
+       This works together with wait_for_other_safe_points() to
+       signal the C_SAFE_POINT condition.
     */
     mutex_lock();
+    collectable_safe_point();
+    mutex_unlock();
+}
+
+static void collectable_safe_point(void)
+{
     assert(STM_PSEGMENT->safe_point == SP_RUNNING);
 
-    if (_stm_nursery_end == NSE_SIGNAL) {
+    while (STM_SEGMENT->nursery_end == NSE_SIGNAL) {
         STM_PSEGMENT->safe_point = SP_SAFE_POINT_CAN_COLLECT;
+        STM_SEGMENT->nursery_end = NURSERY_END;
 
-        cond_broadcast();
+        /* signal all the threads blocked in
+           wait_for_other_safe_points() */
+        cond_broadcast(C_SAFE_POINT);
 
-        do { cond_wait(); } while (_stm_nursery_end == NSE_SIGNAL);
+        cond_wait(C_RESUME);
 
         STM_PSEGMENT->safe_point = SP_RUNNING;
     }
-
-    mutex_unlock();
 }
diff --git a/c7/stm/sync.h b/c7/stm/sync.h
--- a/c7/stm/sync.h
+++ b/c7/stm/sync.h
@@ -3,11 +3,18 @@
 static void setup_sync(void);
 static void teardown_sync(void);
 
-/* all synchronization is done via a mutex and condition variable */
+/* all synchronization is done via a mutex and a few condition variables */
+enum cond_type_e {
+    C_RELEASE_THREAD_SEGMENT,
+    C_SAFE_POINT,
+    C_RESUME,
+    _C_TOTAL
+};
 static void mutex_lock(void);
 static void mutex_unlock(void);
-static void cond_wait(void);
-static void cond_broadcast(void);
+static void cond_wait(enum cond_type_e);
+static void cond_broadcast(enum cond_type_e);
+static void cond_signal(enum cond_type_e);
 #ifndef NDEBUG
 static bool _has_mutex(void);
 #endif
@@ -19,3 +26,4 @@
 
 /* see the source for an exact description */
 static void wait_for_other_safe_points(int requested_safe_point_kind);
+static void collectable_safe_point(void);
diff --git a/c7/stmgc.h b/c7/stmgc.h
--- a/c7/stmgc.h
+++ b/c7/stmgc.h
@@ -43,6 +43,7 @@
     int segment_num;
     char *segment_base;
     stm_char *nursery_current;
+    uintptr_t nursery_end;
     struct stm_thread_local_s *running_thread;
     stm_jmpbuf_t *jmpbuf_ptr;
 };
@@ -65,8 +66,6 @@
 void _stm_start_transaction(stm_thread_local_t *, stm_jmpbuf_t *);
 void _stm_collectable_safe_point(void);
 
-extern uintptr_t _stm_nursery_end;
-
 #ifdef STM_TESTS
 bool _stm_was_read(object_t *obj);
 bool _stm_was_written(object_t *obj);
@@ -173,7 +172,7 @@
     stm_char *p = STM_SEGMENT->nursery_current;
     stm_char *end = p + size_rounded_up;
     STM_SEGMENT->nursery_current = end;
-    if (UNLIKELY((uintptr_t)end > _stm_nursery_end))
+    if (UNLIKELY((uintptr_t)end > STM_SEGMENT->nursery_end))
         return _stm_allocate_slowpath(size_rounded_up);
 
     return (object_t *)p;
@@ -231,7 +230,7 @@
 /* Forces a safe-point if needed.  Normally not needed: this is
    automatic if you call stm_allocate(). */
 static inline void stm_safe_point(void) {
-    if (_stm_nursery_end == _STM_NSE_SIGNAL)
+    if (STM_SEGMENT->nursery_end == _STM_NSE_SIGNAL)
         _stm_collectable_safe_point();
 }
 
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to