Author: Armin Rigo <[email protected]>
Branch: 
Changeset: r85726:919e00b3e558
Date: 2016-07-15 23:10 +0200
http://bitbucket.org/pypy/pypy/changeset/919e00b3e558/

Log:    Issue #2341: some more logic, attempting to avoid the bad case

diff --git a/rpython/translator/c/src/asm_gcc_x86.h 
b/rpython/translator/c/src/asm_gcc_x86.h
--- a/rpython/translator/c/src/asm_gcc_x86.h
+++ b/rpython/translator/c/src/asm_gcc_x86.h
@@ -106,3 +106,6 @@
 #define PYPY_X86_CHECK_SSE2_DEFINED
 RPY_EXTERN void pypy_x86_check_sse2(void);
 #endif
+
+
+#define RPy_YieldProcessor()  asm("pause")
diff --git a/rpython/translator/c/src/asm_gcc_x86_64.h 
b/rpython/translator/c/src/asm_gcc_x86_64.h
--- a/rpython/translator/c/src/asm_gcc_x86_64.h
+++ b/rpython/translator/c/src/asm_gcc_x86_64.h
@@ -6,3 +6,6 @@
     asm volatile("rdtsc" : "=a"(_rax), "=d"(_rdx)); \
     val = (_rdx << 32) | _rax;                          \
 } while (0)
+
+
+#define RPy_YieldProcessor()  asm("pause")
diff --git a/rpython/translator/c/src/thread_gil.c 
b/rpython/translator/c/src/thread_gil.c
--- a/rpython/translator/c/src/thread_gil.c
+++ b/rpython/translator/c/src/thread_gil.c
@@ -44,6 +44,7 @@
 */
 long rpy_fastgil = 0;
 static long rpy_waiting_threads = -42;    /* GIL not initialized */
+static volatile int rpy_early_poll_n = 0;
 static mutex1_t mutex_gil_stealer;
 static mutex2_t mutex_gil;
 
@@ -66,6 +67,30 @@
     }
 }
 
+static void check_and_save_old_fastgil(long old_fastgil)
+{
+    assert(RPY_FASTGIL_LOCKED(rpy_fastgil));
+
+#ifdef PYPY_USE_ASMGCC
+    if (old_fastgil != 0) {
+        /* this case only occurs from the JIT compiler */
+        struct pypy_ASM_FRAMEDATA_HEAD0 *new =
+            (struct pypy_ASM_FRAMEDATA_HEAD0 *)old_fastgil;
+        struct pypy_ASM_FRAMEDATA_HEAD0 *root = &pypy_g_ASM_FRAMEDATA_HEAD;
+        struct pypy_ASM_FRAMEDATA_HEAD0 *next = root->as_next;
+        new->as_next = next;
+        new->as_prev = root;
+        root->as_next = new;
+        next->as_prev = new;
+    }
+#else
+    assert(old_fastgil == 0);
+#endif
+}
+
+#define RPY_GIL_POKE_MIN   40
+#define RPY_GIL_POKE_MAX  400
+
 void RPyGilAcquireSlowPath(long old_fastgil)
 {
     /* Acquires the GIL.  This assumes that we already did:
@@ -79,6 +104,8 @@
     }
     else {
         /* Otherwise, another thread is busy with the GIL. */
+        int n;
+        long old_waiting_threads;
 
         if (rpy_waiting_threads < 0) {
             /* <arigo> I tried to have RPyGilAllocate() called from
@@ -98,7 +125,56 @@
         /* Register me as one of the threads that is actively waiting
            for the GIL.  The number of such threads is found in
            rpy_waiting_threads. */
-        atomic_increment(&rpy_waiting_threads);
+        old_waiting_threads = atomic_increment(&rpy_waiting_threads);
+
+        /* Early polling: before entering the waiting queue, we check
+           a certain number of times if the GIL becomes free.  The
+           motivation for this is issue #2341.  Note that we do this
+           polling even if there are already other threads in the
+           queue, and one of thesee threads is the stealer.  This is
+           because the stealer is likely sleeping right now.  There
+           are use cases where the GIL will really be released very
+           soon after RPyGilAcquireSlowPath() is called, so it's worth
+           always doing this check.
+
+           To avoid falling into bad cases, we "randomize" the number
+           of iterations: we loop N times, where N is choosen between
+           RPY_GIL_POKE_MIN and RPY_GIL_POKE_MAX.
+        */
+        n = rpy_early_poll_n * 2 + 1;
+        while (n >= RPY_GIL_POKE_MAX)
+            n -= (RPY_GIL_POKE_MAX - RPY_GIL_POKE_MIN);
+        rpy_early_poll_n = n;
+        while (n >= 0) {
+            n--;
+            if (old_waiting_threads != rpy_waiting_threads) {
+                /* If the number changed, it is because another thread 
+                   entered or left this function.  In that case, stop
+                   this loop: if another thread left it means the GIL
+                   has been acquired by that thread; if another thread 
+                   entered there is no point in running the present
+                   loop twice. */
+                break;
+            }
+            RPy_YieldProcessor();
+            asm volatile("":::"memory");   /* compiler memory barrier */
+
+            if (!RPY_FASTGIL_LOCKED(rpy_fastgil)) {
+                old_fastgil = pypy_lock_test_and_set(&rpy_fastgil, 1);
+                if (!RPY_FASTGIL_LOCKED(old_fastgil)) {
+                    /* We got the gil before entering the waiting
+                       queue.  In case there are other threads waiting
+                       for the GIL, wake up the stealer thread now and
+                       go to the waiting queue anyway, for fairness.
+                       This will fall through if there are no other
+                       threads waiting.
+                    */
+                    check_and_save_old_fastgil(old_fastgil);
+                    mutex2_unlock(&mutex_gil);
+                    break;
+                }
+            }
+        }
 
         /* Enter the waiting queue from the end.  Assuming a roughly
            first-in-first-out order, this will nicely give the threads
@@ -109,6 +185,15 @@
 
         /* We are now the stealer thread.  Steals! */
         while (1) {
+            /* Busy-looping here.  Try to look again if 'rpy_fastgil' is
+               released.
+            */
+            if (!RPY_FASTGIL_LOCKED(rpy_fastgil)) {
+                old_fastgil = pypy_lock_test_and_set(&rpy_fastgil, 1);
+                if (!RPY_FASTGIL_LOCKED(old_fastgil))
+                    /* yes, got a non-held value!  Now we hold it. */
+                    break;
+            }
             /* Sleep for one interval of time.  We may be woken up earlier
                if 'mutex_gil' is released.
             */
@@ -119,39 +204,13 @@
                 old_fastgil = 0;
                 break;
             }
-
-            /* Busy-looping here.  Try to look again if 'rpy_fastgil' is
-               released.
-            */
-            if (!RPY_FASTGIL_LOCKED(rpy_fastgil)) {
-                old_fastgil = pypy_lock_test_and_set(&rpy_fastgil, 1);
-                if (!RPY_FASTGIL_LOCKED(old_fastgil))
-                    /* yes, got a non-held value!  Now we hold it. */
-                    break;
-            }
-            /* Otherwise, loop back. */
+            /* Loop back. */
         }
         atomic_decrement(&rpy_waiting_threads);
         mutex2_loop_stop(&mutex_gil);
         mutex1_unlock(&mutex_gil_stealer);
     }
-    assert(RPY_FASTGIL_LOCKED(rpy_fastgil));
-
-#ifdef PYPY_USE_ASMGCC
-    if (old_fastgil != 0) {
-        /* this case only occurs from the JIT compiler */
-        struct pypy_ASM_FRAMEDATA_HEAD0 *new =
-            (struct pypy_ASM_FRAMEDATA_HEAD0 *)old_fastgil;
-        struct pypy_ASM_FRAMEDATA_HEAD0 *root = &pypy_g_ASM_FRAMEDATA_HEAD;
-        struct pypy_ASM_FRAMEDATA_HEAD0 *next = root->as_next;
-        new->as_next = next;
-        new->as_prev = root;
-        root->as_next = new;
-        next->as_prev = new;
-    }
-#else
-    assert(old_fastgil == 0);
-#endif
+    check_and_save_old_fastgil(old_fastgil);
 }
 
 long RPyGilYieldThread(void)
diff --git a/rpython/translator/c/src/thread_nt.c 
b/rpython/translator/c/src/thread_nt.c
--- a/rpython/translator/c/src/thread_nt.c
+++ b/rpython/translator/c/src/thread_nt.c
@@ -258,5 +258,8 @@
 //#define pypy_lock_test_and_set(ptr, value)  see thread_nt.h
 #define atomic_increment(ptr)          InterlockedIncrement(ptr)
 #define atomic_decrement(ptr)          InterlockedDecrement(ptr)
+#ifndef YieldProcessor
+#  define YieldProcessor()             __asm { rep nop }
+#endif
 
 #include "src/thread_gil.c"
diff --git a/rpython/translator/c/src/thread_pthread.c 
b/rpython/translator/c/src/thread_pthread.c
--- a/rpython/translator/c/src/thread_pthread.c
+++ b/rpython/translator/c/src/thread_pthread.c
@@ -552,8 +552,13 @@
 }
 
 //#define pypy_lock_test_and_set(ptr, value)  see thread_pthread.h
-#define atomic_increment(ptr)          __sync_fetch_and_add(ptr, 1)
-#define atomic_decrement(ptr)          __sync_fetch_and_sub(ptr, 1)
+#define atomic_increment(ptr)          __sync_add_and_fetch(ptr, 1)
+#define atomic_decrement(ptr)          __sync_sub_and_fetch(ptr, 1)
 #define HAVE_PTHREAD_ATFORK            1
 
+#include "src/asm.h"   /* for RPy_YieldProcessor() */
+#ifndef RPy_YieldProcessor
+#  define RPy_YieldProcessor()   /* nothing */
+#endif
+
 #include "src/thread_gil.c"
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to