Author: Armin Rigo <[email protected]>
Branch: 
Changeset: r943:e04a0e585284
Date: 2014-03-04 10:32 +0100
http://bitbucket.org/pypy/stmgc/changeset/e04a0e585284/

Log:    Refactor the contention management. Now become_inevitable is also a
        case of contention (in this case, for the right to be inevitable).
        Contention managers can choose to pause the running thread too.

diff --git a/c7/stm/contention.c b/c7/stm/contention.c
--- a/c7/stm/contention.c
+++ b/c7/stm/contention.c
@@ -3,80 +3,189 @@
 #endif
 
 
-static void contention_management(uint8_t other_segment_num)
+enum contention_kind_e {
+
+    /* A write-write contention occurs when we running our transaction
+       and detect that we are about to write to an object that another
+       thread is also writing to.  This kind of contention must be
+       resolved before continuing.  This *must* abort one of the two
+       threads: the caller's thread is not at a safe-point, so cannot
+       wait! */
+    WRITE_WRITE_CONTENTION,
+
+    /* A write-read contention occurs when we are trying to commit: it
+       means that an object we wrote to was also read by another
+       transaction.  Even though it would seem obvious that we should
+       just abort the other thread and proceed in our commit, a more
+       subtle answer would be in some cases to wait for the other thread
+       to commit first.  It would commit having read the old value, and
+       then we can commit our change to it. */
+    WRITE_READ_CONTENTION,
+
+    /* An inevitable contention occurs when we're trying to become
+       inevitable but another thread already is.  We can never abort the
+       other thread in this case, but we still have the choice to abort
+       ourselves or pause until the other thread commits. */
+    INEVITABLE_CONTENTION,
+};
+
+struct contmgr_s {
+    enum contention_kind_e kind;
+    struct stm_priv_segment_info_s *other_pseg;
+    bool abort_other;
+    bool try_sleep;  // XXX add a way to timeout, but should handle repeated
+                     // calls to contention_management() to avoid re-sleeping
+                     // for the whole duration
+};
+
+
+/************************************************************/
+
+
+__attribute__((unused))
+static void cm_always_abort_myself(struct contmgr_s *cm)
 {
-    /* A simple contention manager.  Called when some other thread
-       holds the write lock on an object.  The current thread tries
-       to do either a write or a read on it. */
+    cm->abort_other = false;
+}
 
+__attribute__((unused))
+static void cm_always_abort_other(struct contmgr_s *cm)
+{
+    cm->abort_other = true;
+}
+
+__attribute__((unused))
+static void cm_abort_the_younger(struct contmgr_s *cm)
+{
+    if (STM_PSEGMENT->start_time >= cm->other_pseg->start_time) {
+        /* We started after the other thread.  Abort */
+        cm->abort_other = false;
+    }
+    else {
+        cm->abort_other = true;
+    }
+}
+
+__attribute__((unused))
+static void cm_always_wait_for_other_thread(struct contmgr_s *cm)
+{
+    cm_abort_the_younger(cm);
+    cm->try_sleep = true;
+}
+
+__attribute__((unused))
+static void cm_pause_if_younger(struct contmgr_s *cm)
+{
+    if (STM_PSEGMENT->start_time >= cm->other_pseg->start_time) {
+        /* We started after the other thread.  Pause */
+        cm->try_sleep = true;
+        cm->abort_other = false;
+    }
+    else {
+        cm->abort_other = true;
+    }
+}
+
+
+/************************************************************/
+
+
+static void contention_management(uint8_t other_segment_num,
+                                  enum contention_kind_e kind)
+{
     assert(_has_mutex());
     assert(other_segment_num != STM_SEGMENT->segment_num);
 
-    /* Who should abort here: this thread, or the other thread? */
-    struct stm_priv_segment_info_s* other_pseg;
-    other_pseg = get_priv_segment(other_segment_num);
-
-    if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
-        /* I'm inevitable, so the other is not. */
-        assert(other_pseg->transaction_state != TS_INEVITABLE);
-        other_pseg->pub.nursery_end = NSE_SIGABORT;
-    }
-    else if (other_pseg->start_time <= STM_PSEGMENT->start_time) {
-        /* The other thread started before us, so I should abort, as I'm
-           the least long-running transaction. */
-    }
-    else if (other_pseg->transaction_state == TS_REGULAR) {
-        /* The other thread started strictly after us.  We tell it
-           to abort if we can (e.g. if it's not TS_INEVITABLE). */
-        other_pseg->pub.nursery_end = NSE_SIGABORT;
-    }
-
-    /* Now check what we just did... almost: the check at the following
-       line can also find a NSE_SIGABORT that was set earlier.
-    */
-    if (other_pseg->pub.nursery_end != NSE_SIGABORT) {
-        /* if the other thread is not in aborting-soon mode, then *we*
-           must abort. */
-        abort_with_mutex();
-    }
-}
-
-static void write_write_contention_management(uintptr_t lock_idx)
-{
-    s_mutex_lock();
     if (must_abort())
         abort_with_mutex();
 
-    uint8_t prev_owner = ((volatile uint8_t *)write_locks)[lock_idx];
-    if (prev_owner != 0 && prev_owner != STM_PSEGMENT->write_lock_num) {
+    /* Who should abort here: this thread, or the other thread? */
+    struct contmgr_s contmgr;
+    contmgr.kind = kind;
+    contmgr.other_pseg = get_priv_segment(other_segment_num);
+    contmgr.abort_other = false;
+    contmgr.try_sleep = false;
 
-        uint8_t other_segment_num = prev_owner - 1;
-        assert(get_priv_segment(other_segment_num)->write_lock_num ==
-               prev_owner);
+    /* Pick one contention management... could be made dynamically choosable */
+#ifdef STM_TESTS
+    cm_abort_the_younger(&contmgr);
+#else
+    cm_always_wait_for_other_thread(&contmgr);
+#endif
 
-        /* Do generic contention management.  If found that we must abort,
-           calls abort_with_mutex() and never returns.  If found that the
-           other thread must abort, signal it with NSE_SIGABORT.  Note that
-           this can overwrite a NSE_SIGPAUSE, which is fine. */
-        contention_management(other_segment_num);
-        assert(get_segment(other_segment_num)->nursery_end == NSE_SIGABORT);
+    /* Fix the choices that are found incorrect due to TS_INEVITABLE
+       or NSE_SIGABORT */
+    if (contmgr.other_pseg->pub.nursery_end == NSE_SIGABORT) {
+        contmgr.abort_other = true;
+        contmgr.try_sleep = false;
+    }
+    else if (STM_PSEGMENT->transaction_state == TS_INEVITABLE) {
+        assert(contmgr.other_pseg->transaction_state != TS_INEVITABLE);
+        contmgr.abort_other = true;
+    }
+    else if (contmgr.other_pseg->transaction_state == TS_INEVITABLE) {
+        contmgr.abort_other = false;
+    }
 
-        /* The rest of this code is for the case where we continue to
-           run.  We have to signal the other thread to abort, and wait
-           until it does. */
+    if (contmgr.try_sleep && kind != WRITE_WRITE_CONTENTION &&
+        contmgr.other_pseg->safe_point != SP_WAIT_FOR_C_TRANSACTION_DONE) {
+        /* Sleep.
 
-        int sp = get_priv_segment(other_segment_num)->safe_point;
+           - Not for write-write contentions, because we're not at a
+             safe-point.
+
+           - To prevent loops of threads waiting for each others, use
+             a crude heuristic of never pausing for a thread that is
+             itself already paused here.
+        */
+        contmgr.other_pseg->signal_when_done = true;
+
+        dprintf(("pausing...\n"));
+        cond_signal(C_AT_SAFE_POINT);
+        STM_PSEGMENT->safe_point = SP_WAIT_FOR_C_TRANSACTION_DONE;
+        cond_wait(C_TRANSACTION_DONE);
+        STM_PSEGMENT->safe_point = SP_RUNNING;
+        dprintf(("pausing done\n"));
+
+        if (must_abort())
+            abort_with_mutex();
+    }
+    else if (!contmgr.abort_other) {
+        dprintf(("abort in contention\n"));
+        abort_with_mutex();
+    }
+    else {
+        /* We have to signal the other thread to abort, and wait until
+           it does. */
+        contmgr.other_pseg->pub.nursery_end = NSE_SIGABORT;
+
+        int sp = contmgr.other_pseg->safe_point;
         switch (sp) {
 
         case SP_RUNNING:
             /* The other thread is running now, so as NSE_SIGABORT was
                set in its 'nursery_end', it will soon enter a
                mutex_lock() and thus abort.
+
+               In this case, we will wait until it broadcasts "I'm done
+               aborting".  Important: this is not a safe point of any
+               kind!  The shadowstack may not be correct here.  It
+               should not end in a deadlock, because the target thread
+               is, in principle, guaranteed to call abort_with_mutex()
+               very soon.
             */
+            dprintf(("contention: wait C_ABORTED...\n"));
+            cond_wait(C_ABORTED);
+            dprintf(("contention: done\n"));
+
+            if (must_abort())
+                abort_with_mutex();
             break;
 
         /* The other cases are where the other thread is at a
            safe-point.  We wake it up by sending the correct signal.
+           We don't have to wait here: the other thread will not do
+           anything more than abort when it really wakes up later.
         */
         case SP_WAIT_FOR_C_REQUEST_REMOVED:
             cond_broadcast(C_REQUEST_REMOVED);
@@ -86,29 +195,45 @@
             cond_broadcast(C_AT_SAFE_POINT);
             break;
 
+        case SP_WAIT_FOR_C_TRANSACTION_DONE:
+            cond_broadcast(C_TRANSACTION_DONE);
+            break;
+
 #ifdef STM_TESTS
         case SP_WAIT_FOR_OTHER_THREAD:
-            /* abort anyway for tests.  We can't wait here */
-            abort_with_mutex();
+            /* for tests: the other thread will abort as soon as
+               stm_stop_safe_point() is called */
+            break;
 #endif
 
         default:
             stm_fatalerror("unexpected other_pseg->safe_point: %d", sp);
         }
 
-        /* wait, hopefully until the other thread broadcasts "I'm
-           done aborting" (spurious wake-ups are ok).  Important:
-           this is not a safe point of any kind!  The shadowstack
-           is not correct here.  It should not end in a deadlock,
-           because the target thread is, in principle, guaranteed
-           to call abort_with_mutex().
-        */
-        dprintf(("contention: wait C_ABORTED...\n"));
-        cond_wait(C_ABORTED);
-        dprintf(("contention: done\n"));
+        if (is_aborting_now(other_segment_num)) {
+            /* The other thread is blocked in a safe-point with NSE_SIGABORT.
+               We don't have to wake it up right now, but we know it will
+               abort as soon as it wakes up.  We can safely force it to
+               reset its state now. */
+            dprintf(("reset other modified\n"));
+            reset_modified_from_other_segments(other_segment_num);
+        }
+        dprintf(("killed other thread\n"));
+    }
+}
 
-        if (must_abort())
-            abort_with_mutex();
+static void write_write_contention_management(uintptr_t lock_idx)
+{
+    s_mutex_lock();
+
+    uint8_t prev_owner = ((volatile uint8_t *)write_locks)[lock_idx];
+    if (prev_owner != 0 && prev_owner != STM_PSEGMENT->write_lock_num) {
+
+        uint8_t other_segment_num = prev_owner - 1;
+        assert(get_priv_segment(other_segment_num)->write_lock_num ==
+               prev_owner);
+
+        contention_management(other_segment_num, WRITE_WRITE_CONTENTION);
 
         /* now we return into _stm_write_slowpath() and will try again
            to acquire the write lock on our object. */
@@ -116,3 +241,13 @@
 
     s_mutex_unlock();
 }
+
+static void write_read_contention_management(uint8_t other_segment_num)
+{
+    contention_management(other_segment_num, WRITE_READ_CONTENTION);
+}
+
+static void inevitable_contention_management(uint8_t other_segment_num)
+{
+    contention_management(other_segment_num, INEVITABLE_CONTENTION);
+}
diff --git a/c7/stm/contention.h b/c7/stm/contention.h
--- a/c7/stm/contention.h
+++ b/c7/stm/contention.h
@@ -1,3 +1,9 @@
 
-static void contention_management(uint8_t other_segment_num);
 static void write_write_contention_management(uintptr_t lock_idx);
+static void write_read_contention_management(uint8_t other_segment_num);
+static void inevitable_contention_management(uint8_t other_segment_num);
+
+static inline bool is_aborting_now(uint8_t other_segment_num) {
+    return (get_segment(other_segment_num)->nursery_end == NSE_SIGABORT &&
+            get_priv_segment(other_segment_num)->safe_point != SP_RUNNING);
+}
diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -26,7 +26,7 @@
     }
 
     /* do a read-barrier now.  Note that this must occur before the
-       safepoints that may be issued in contention_management(). */
+       safepoints that may be issued in write_write_contention_management(). */
     stm_read(obj);
 
     /* claim the write-lock for this object.  In case we're running the
@@ -192,14 +192,17 @@
 # error "The logic in the functions below only works with two segments"
 #endif
 
-static void detect_write_read_conflicts(void)
+static bool detect_write_read_conflicts(void)
 {
     long remote_num = 1 - STM_SEGMENT->segment_num;
     char *remote_base = get_segment_base(remote_num);
     uint8_t remote_version = get_segment(remote_num)->transaction_read_version;
 
     if (get_priv_segment(remote_num)->transaction_state == TS_NONE)
-        return;    /* no need to check */
+        return false;    /* no need to check */
+
+    if (is_aborting_now(remote_num))
+        return false;    /* no need to check: is pending immediate abort */
 
     LIST_FOREACH_R(
         STM_PSEGMENT->modified_old_objects,
@@ -207,14 +210,17 @@
         ({
             if (was_read_remote(remote_base, item, remote_version)) {
                 /* A write-read conflict! */
-                contention_management(remote_num);
+                write_read_contention_management(remote_num);
 
-                /* If we reach this point, it means that we would like
-                   the other thread to abort.  We're done here. */
-                assert(get_segment(remote_num)->nursery_end == NSE_SIGABORT);
-                return;
+                /* If we reach this point, we didn't abort, but maybe we
+                   had to wait for the other thread to commit.  If we
+                   did, then we have to restart committing from our call
+                   to synchronize_all_threads(). */
+                return true;
             }
         }));
+
+    return false;
 }
 
 static void synchronize_overflow_object_now(object_t *obj)
@@ -335,13 +341,15 @@
 
     s_mutex_lock();
 
+ restart:
     /* force all other threads to be paused.  They will unpause
        automatically when we are done here, i.e. at mutex_unlock().
        Important: we should not call cond_wait() in the meantime. */
     synchronize_all_threads();
 
     /* detect conflicts */
-    detect_write_read_conflicts();
+    if (detect_write_read_conflicts())
+        goto restart;
 
     /* cannot abort any more from here */
     dprintf(("commit_transaction\n"));
@@ -386,16 +394,25 @@
     abort_with_mutex();
 }
 
-static void reset_modified_from_other_segments(void)
+static void
+reset_modified_from_other_segments(int segment_num)
 {
     /* pull the right versions from other threads in order
-       to reset our pages as part of an abort */
-    long remote_num = 1 - STM_SEGMENT->segment_num;
-    char *local_base = STM_SEGMENT->segment_base;
+       to reset our pages as part of an abort.
+
+       Note that this function is also sometimes called from
+       contention.c to clean up the state of a different thread,
+       when we would really like it to be aborted now and it is
+       suspended at a safe-point.
+
+    */
+    struct stm_priv_segment_info_s *pseg = get_priv_segment(segment_num);
+    long remote_num = !segment_num;
+    char *local_base = get_segment_base(segment_num);
     char *remote_base = get_segment_base(remote_num);
 
     LIST_FOREACH_R(
-        STM_PSEGMENT->modified_old_objects,
+        pseg->modified_old_objects,
         object_t * /*item*/,
         ({
             /* memcpy in the opposite direction than
@@ -423,11 +440,11 @@
             /* clear the write-lock */
             uintptr_t lock_idx = (((uintptr_t)item) >> 4) - WRITELOCK_START;
             assert((intptr_t)lock_idx >= 0);
-            assert(write_locks[lock_idx] == STM_PSEGMENT->write_lock_num);
+            assert(write_locks[lock_idx] == pseg->write_lock_num);
             write_locks[lock_idx] = 0;
         }));
 
-    list_clear(STM_PSEGMENT->modified_old_objects);
+    list_clear(pseg->modified_old_objects);
 }
 
 static void abort_with_mutex(void)
@@ -449,7 +466,7 @@
     throw_away_nursery();
 
     /* reset all the modified objects (incl. re-adding GCFLAG_WRITE_BARRIER) */
-    reset_modified_from_other_segments();
+    reset_modified_from_other_segments(STM_SEGMENT->segment_num);
 
     stm_jmpbuf_t *jmpbuf_ptr = STM_SEGMENT->jmpbuf_ptr;
     stm_thread_local_t *tl = STM_SEGMENT->running_thread;
diff --git a/c7/stm/core.h b/c7/stm/core.h
--- a/c7/stm/core.h
+++ b/c7/stm/core.h
@@ -116,6 +116,9 @@
     /* Temp for minor collection */
     bool minor_collect_will_commit_now;
 
+    /* For sleeping contention management */
+    bool signal_when_done;
+
     /* In case of abort, we restore the 'shadowstack' field and the
        'thread_local_obj' field. */
     object_t **shadowstack_at_start_of_transaction;
@@ -132,6 +135,7 @@
     SP_RUNNING,
     SP_WAIT_FOR_C_REQUEST_REMOVED,
     SP_WAIT_FOR_C_AT_SAFE_POINT,
+    SP_WAIT_FOR_C_TRANSACTION_DONE,
 #ifdef STM_TESTS
     SP_WAIT_FOR_OTHER_THREAD,
 #endif
diff --git a/c7/stm/pages.c b/c7/stm/pages.c
--- a/c7/stm/pages.c
+++ b/c7/stm/pages.c
@@ -39,7 +39,7 @@
     __sync_lock_release(&pages_ctl.mutex_pages);
 }
 
-static bool _has_mutex_pages(void) __attribute__((unused));
+__attribute__((unused))
 static bool _has_mutex_pages(void)
 {
     return pages_ctl.mutex_pages != 0;
diff --git a/c7/stm/setup.c b/c7/stm/setup.c
--- a/c7/stm/setup.c
+++ b/c7/stm/setup.c
@@ -170,7 +170,7 @@
     s_mutex_unlock();
 }
 
-static bool _is_tl_registered(stm_thread_local_t *tl) __attribute__((unused));
+__attribute__((unused))
 static bool _is_tl_registered(stm_thread_local_t *tl)
 {
     return tl->next != NULL;
diff --git a/c7/stm/sync.c b/c7/stm/sync.c
--- a/c7/stm/sync.c
+++ b/c7/stm/sync.c
@@ -127,14 +127,18 @@
     for (i = 0; i < NB_SEGMENTS; i++) {
         if (get_priv_segment(i)->transaction_state == TS_INEVITABLE) {
             if (can_abort) {
-                /* for now, always abort if we can.  We could also try
-                   sometimes to wait for the other thread (needs to
-                   take care about setting safe_point then) */
-                abort_with_mutex();
+                /* handle this case like a contention: it will either
+                   abort us (not the other thread, which is inevitable),
+                   or for a while.  If we go past this call, then we
+                   waited; in this case we have to re-check if no other
+                   thread is inevitable. */
+                inevitable_contention_management(i);
             }
-            /* wait for stm_commit_transaction() to finish this
-               inevitable transaction */
-            cond_wait(C_INEVITABLE);
+            else {
+                /* wait for stm_commit_transaction() to finish this
+                   inevitable transaction */
+                cond_wait(C_INEVITABLE);
+            }
             goto restart;
         }
     }
@@ -192,17 +196,23 @@
 {
     assert(_has_mutex());
 
+    /* wake up one of the threads waiting in acquire_thread_segment() */
+    cond_signal(C_SEGMENT_FREE);
+
+    /* if contention management asked for it, broadcast this thread's end */
+    if (STM_PSEGMENT->signal_when_done) {
+        cond_broadcast(C_TRANSACTION_DONE);
+        STM_PSEGMENT->signal_when_done = false;
+    }
+
     assert(STM_SEGMENT->running_thread == tl);
     STM_SEGMENT->running_thread = NULL;
 
     assert(sync_ctl.in_use[tl->associated_segment_num] == 1);
     sync_ctl.in_use[tl->associated_segment_num] = 0;
-
-    /* wake up one of the threads waiting in acquire_thread_segment() */
-    cond_signal(C_SEGMENT_FREE);
 }
 
-static bool _running_transaction(void) __attribute__((unused));
+__attribute__((unused))
 static bool _running_transaction(void)
 {
     return (STM_SEGMENT->running_thread != NULL);
diff --git a/c7/stm/sync.h b/c7/stm/sync.h
--- a/c7/stm/sync.h
+++ b/c7/stm/sync.h
@@ -10,6 +10,7 @@
     C_REQUEST_REMOVED,
     C_INEVITABLE,
     C_ABORTED,
+    C_TRANSACTION_DONE,
     _C_TOTAL
 };
 static void s_mutex_lock(void);
diff --git a/c7/test/test_random.py b/c7/test/test_random.py
--- a/c7/test/test_random.py
+++ b/c7/test/test_random.py
@@ -47,8 +47,6 @@
         our_trs.set_must_abort(objs_in_conflict)
     elif wait:
         assert not our_trs.inevitable
-        # abort anyway:
-        our_trs.set_must_abort(objs_in_conflict)
 
 
 class TransactionState(object):
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to