Author: Armin Rigo <ar...@tunes.org> Branch: gil-improvement Changeset: r47732:c9ecb31b1680 Date: 2011-09-30 22:41 +0200 http://bitbucket.org/pypy/pypy/changeset/c9ecb31b1680/
Log: Port the code. Trying with a random but larger value for sys.checkinterval; will experiment more. diff --git a/pypy/interpreter/executioncontext.py b/pypy/interpreter/executioncontext.py --- a/pypy/interpreter/executioncontext.py +++ b/pypy/interpreter/executioncontext.py @@ -307,7 +307,11 @@ self._nonperiodic_actions = [] self.has_bytecode_counter = False self.fired_actions = None - self.checkinterval_scaled = 100 * TICK_COUNTER_STEP + # the default value is not 100, unlike CPython 2.7, but a much + # larger value, because we use a technique that not only allows + # but actually *forces* another thread to run whenever the counter + # reaches zero. + self.checkinterval_scaled = 10000 * TICK_COUNTER_STEP self._rebuild_action_dispatcher() def fire(self, action): diff --git a/pypy/module/thread/gil.py b/pypy/module/thread/gil.py --- a/pypy/module/thread/gil.py +++ b/pypy/module/thread/gil.py @@ -16,7 +16,7 @@ class GILThreadLocals(OSThreadLocals): """A version of OSThreadLocals that enforces a GIL.""" - ll_GIL = thread.null_ll_lock + gil_ready = False def initialize(self, space): # add the GIL-releasing callback as an action on the space @@ -25,12 +25,10 @@ def setup_threads(self, space): """Enable threads in the object space, if they haven't already been.""" - if not self.ll_GIL: - try: - self.ll_GIL = thread.allocate_ll_lock() - except thread.error: + if not self.gil_ready: + if not thread.gil_allocate(): raise wrap_thread_error(space, "can't allocate GIL") - thread.acquire_NOAUTO(self.ll_GIL, True) + self.gil_ready = True self.enter_thread(space) # setup the main thread result = True else: @@ -44,19 +42,16 @@ # test_lock_again after the global state was cleared by # test_compile_lock. As a workaround, we repatch these global # fields systematically. - spacestate.ll_GIL = self.ll_GIL invoke_around_extcall(before_external_call, after_external_call) return result def reinit_threads(self, space): - if self.ll_GIL: - self.ll_GIL = thread.allocate_ll_lock() - thread.acquire_NOAUTO(self.ll_GIL, True) - self.enter_thread(space) + if self.gil_ready: + self.gil_ready = False + self.setup_threads(space) def yield_thread(self): - thread.yield_thread() # explicitly release the gil (used by test_gil) - + do_yield_thread() class GILReleaseAction(PeriodicAsyncAction): """An action called every sys.checkinterval bytecodes. It releases @@ -64,16 +59,12 @@ """ def perform(self, executioncontext, frame): - # Other threads can run between the release() and the acquire() - # implicit in the following external function call (which has - # otherwise no effect). - thread.yield_thread() + do_yield_thread() class SpaceState: def _freeze_(self): - self.ll_GIL = thread.null_ll_lock self.action_after_thread_switch = None # ^^^ set by AsyncAction.fire_after_thread_switch() return False @@ -95,14 +86,14 @@ # this function must not raise, in such a way that the exception # transformer knows that it cannot raise! e = get_errno() - thread.release_NOAUTO(spacestate.ll_GIL) + thread.gil_release() set_errno(e) before_external_call._gctransformer_hint_cannot_collect_ = True before_external_call._dont_reach_me_in_del_ = True def after_external_call(): e = get_errno() - thread.acquire_NOAUTO(spacestate.ll_GIL, True) + thread.gil_acquire() thread.gc_thread_run() spacestate.after_thread_switch() set_errno(e) @@ -115,3 +106,17 @@ # pointers in the shadow stack. This is necessary because the GIL is # not held after the call to before_external_call() or before the call # to after_external_call(). + +def do_yield_thread(): + # explicitly release the gil, in a way that tries to give more + # priority to other threads (as opposed to continuing to run in + # the same thread). + if thread.gil_yield_thread(): + thread.gc_thread_run() + spacestate.after_thread_switch() +do_yield_thread._gctransformer_hint_close_stack_ = True +do_yield_thread._dont_reach_me_in_del_ = True + +# do_yield_thread() needs a different hint: _gctransformer_hint_close_stack_. +# The *_external_call() functions are themselves called only from the rffi +# module from a helper function that also has this hint. diff --git a/pypy/module/thread/ll_thread.py b/pypy/module/thread/ll_thread.py --- a/pypy/module/thread/ll_thread.py +++ b/pypy/module/thread/ll_thread.py @@ -17,7 +17,8 @@ include_dirs = [str(py.path.local(autopath.pypydir).join('translator', 'c'))], export_symbols = ['RPyThreadGetIdent', 'RPyThreadLockInit', 'RPyThreadAcquireLock', 'RPyThreadReleaseLock', - 'RPyThreadYield', + 'RPyGilAllocate', 'RPyGilYieldThread', + 'RPyGilRelease', 'RPyGilAcquire', 'RPyThreadGetStackSize', 'RPyThreadSetStackSize', 'RPyOpaqueDealloc_ThreadLock', 'RPyThreadAfterFork'] @@ -69,8 +70,16 @@ [TLOCKP], lltype.Void, _nowrapper=True) -# this function does nothing apart from releasing the GIL temporarily. -yield_thread = llexternal('RPyThreadYield', [], lltype.Void, threadsafe=True) +# these functions manipulate directly the GIL, whose definition does not +# escape the C code itself +gil_allocate = llexternal('RPyGilAllocate', [], lltype.Signed, + _nowrapper=True) +gil_yield_thread = llexternal('RPyGilYieldThread', [], lltype.Signed, + _nowrapper=True) +gil_release = llexternal('RPyGilRelease', [], lltype.Void, + _nowrapper=True) +gil_acquire = llexternal('RPyGilAcquire', [], lltype.Void, + _nowrapper=True) def allocate_lock(): return Lock(allocate_ll_lock()) diff --git a/pypy/module/thread/test/test_gil.py b/pypy/module/thread/test/test_gil.py --- a/pypy/module/thread/test/test_gil.py +++ b/pypy/module/thread/test/test_gil.py @@ -30,19 +30,34 @@ use_threads = True bigtest = False - def test_one_thread(self): + def test_one_thread(self, skew=+1): + from pypy.rlib.debug import debug_print if self.bigtest: - N = 1000000 + N = 100000 + skew *= 25000 else: N = 100 + skew *= 25 space = FakeSpace() class State: pass state = State() - def runme(): - for i in range(N): + def runme(main=False): + j = 0 + for i in range(N + [-skew, skew][main]): + state.datalen1 += 1 # try to crash if the GIL is not + state.datalen2 += 1 # correctly acquired state.data.append((thread.get_ident(), i)) + state.datalen3 += 1 + state.datalen4 += 1 + assert state.datalen1 == len(state.data) + assert state.datalen2 == len(state.data) + assert state.datalen3 == len(state.data) + assert state.datalen4 == len(state.data) + debug_print(main, i, state.datalen4) state.threadlocals.yield_thread() + assert i == j + j += 1 def bootstrap(): try: runme() @@ -50,20 +65,26 @@ thread.gc_thread_die() def f(): state.data = [] + state.datalen1 = 0 + state.datalen2 = 0 + state.datalen3 = 0 + state.datalen4 = 0 state.threadlocals = gil.GILThreadLocals() state.threadlocals.setup_threads(space) thread.gc_thread_prepare() subident = thread.start_new_thread(bootstrap, ()) mainident = thread.get_ident() - runme() + runme(True) still_waiting = 3000 while len(state.data) < 2*N: + debug_print(len(state.data)) if not still_waiting: raise ValueError("time out") still_waiting -= 1 if not we_are_translated(): gil.before_external_call() time.sleep(0.01) if not we_are_translated(): gil.after_external_call() + debug_print("leaving!") i1 = i2 = 0 for tid, i in state.data: if tid == mainident: @@ -72,14 +93,17 @@ assert i == i2; i2 += 1 else: assert 0 - assert i1 == N - assert i2 == N + assert i1 == N + skew + assert i2 == N - skew return len(state.data) fn = self.getcompiled(f, []) res = fn() assert res == 2*N + def test_one_thread_rev(self): + self.test_one_thread(skew=-1) + class TestRunDirectly(GILTests): def getcompiled(self, f, argtypes): diff --git a/pypy/translator/c/src/thread.h b/pypy/translator/c/src/thread.h --- a/pypy/translator/c/src/thread.h +++ b/pypy/translator/c/src/thread.h @@ -37,14 +37,9 @@ #endif -/* common helper: this does nothing, but is called with the GIL released. - This gives other threads a chance to grab the GIL and run. */ -void RPyThreadYield(void); - -#ifndef PYPY_NOT_MAIN_FILE -void RPyThreadYield(void) -{ -} -#endif +long RPyGilAllocate(void); +long RPyGilYieldThread(void); +void RPyGilRelease(void); +void RPyGilAcquire(void); #endif diff --git a/pypy/translator/c/src/thread_pthread.h b/pypy/translator/c/src/thread_pthread.h --- a/pypy/translator/c/src/thread_pthread.h +++ b/pypy/translator/c/src/thread_pthread.h @@ -12,6 +12,7 @@ #include <signal.h> #include <stdio.h> #include <errno.h> +#include <assert.h> /* The following is hopefully equivalent to what CPython does (which is trying to compile a snippet of code using it) */ @@ -459,4 +460,111 @@ #define RPyThreadTLS_Set(key, value) pthread_setspecific(key, value) +/************************************************************/ +/* GIL code */ +/************************************************************/ + +#ifdef __llvm__ +# define HAS_ATOMIC_ADD +#endif + +#ifdef __GNUC__ +# if __GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 1) +# define HAS_ATOMIC_ADD +# endif +#endif + +#ifdef HAS_ATOMIC_ADD +# define atomic_add __sync_fetch_and_add +#else +# if defined(__amd64__) +# define atomic_add(ptr, value) asm volatile ("lock addq %0, %1" \ + : : "ri"(value), "m"(*(ptr)) : "memory") +# elif defined(__i386__) +# define atomic_add(ptr, value) asm volatile ("lock addl %0, %1" \ + : : "ri"(value), "m"(*(ptr)) : "memory") +# else +# error "Please use gcc >= 4.1 or write a custom 'asm' for your CPU." +# endif +#endif + +#define ASSERT_STATUS(call) \ + if (call != 0) { \ + fprintf(stderr, "Fatal error: " #call "\n"); \ + abort(); \ + } + +static void _debug_print(const char *msg) +{ + int col = (int)pthread_self(); + col = 31 + ((col / 8) % 8); + fprintf(stderr, "\033[%dm%s\033[0m", col, msg); +} + +static volatile long pending_acquires = -1; +static pthread_mutex_t mutex_gil = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cond_gil = PTHREAD_COND_INITIALIZER; + +static void assert_has_the_gil(void) +{ +#ifdef RPY_ASSERT + assert(pthread_mutex_trylock(&mutex_gil) != 0); + assert(pending_acquires >= 0); +#endif +} + +long RPyGilAllocate(void) +{ + _debug_print("RPyGilAllocate\n"); + pending_acquires = 0; + pthread_mutex_trylock(&mutex_gil); + assert_has_the_gil(); + return 1; +} + +long RPyGilYieldThread(void) +{ + /* can be called even before RPyGilAllocate(), but in this case, + pending_acquires will be -1 */ +#ifdef RPY_ASSERT + if (pending_acquires >= 0) + assert_has_the_gil(); +#endif + if (pending_acquires <= 0) + return 0; + atomic_add(&pending_acquires, 1L); + _debug_print("{"); + ASSERT_STATUS(pthread_cond_wait(&cond_gil, &mutex_gil)); + _debug_print("}"); + atomic_add(&pending_acquires, -1L); + ASSERT_STATUS(pthread_cond_signal(&cond_gil)); + assert_has_the_gil(); + return 1; +} + +void RPyGilRelease(void) +{ + _debug_print("RPyGilRelease\n"); +#ifdef RPY_ASSERT + assert(pending_acquires >= 0); +#endif + assert_has_the_gil(); + ASSERT_STATUS(pthread_mutex_unlock(&mutex_gil)); +} + +void RPyGilAcquire(void) +{ + _debug_print("about to RPyGilAcquire...\n"); +#ifdef RPY_ASSERT + assert(pending_acquires >= 0); +#endif + atomic_add(&pending_acquires, 1L); + ASSERT_STATUS(pthread_mutex_lock(&mutex_gil)); + atomic_add(&pending_acquires, -1L); + assert_has_the_gil(); + ASSERT_STATUS(pthread_cond_signal(&cond_gil)); + _debug_print("RPyGilAcquire\n"); +} + + #endif /* PYPY_NOT_MAIN_FILE */ _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit