Author: Matti Picus <matti.pi...@gmail.com> Branch: py3.6 Changeset: r96567:777ad5f524c1 Date: 2019-04-30 09:34 -0400 http://bitbucket.org/pypy/pypy/changeset/777ad5f524c1/
Log: merge default into py3.6 diff --git a/pypy/doc/whatsnew-head.rst b/pypy/doc/whatsnew-head.rst --- a/pypy/doc/whatsnew-head.rst +++ b/pypy/doc/whatsnew-head.rst @@ -20,3 +20,8 @@ .. branch: issue2968 Fix segfault in cpyext_tp_new_tupl + +.. branch: semlock-deadlock + +Test and reduce the probability of a deadlock when acquiring a semaphore by +moving global state changes closer to the actual aquire. diff --git a/pypy/module/_multiprocessing/interp_semaphore.py b/pypy/module/_multiprocessing/interp_semaphore.py --- a/pypy/module/_multiprocessing/interp_semaphore.py +++ b/pypy/module/_multiprocessing/interp_semaphore.py @@ -49,7 +49,8 @@ eci = ExternalCompilationInfo( includes = ['sys/time.h', 'limits.h', - 'semaphore.h'], + 'semaphore.h', + ], libraries = libraries, ) @@ -269,6 +270,8 @@ res = rwin32.WaitForSingleObject(self.handle, 0) if res != rwin32.WAIT_TIMEOUT: + self.last_tid = rthread.get_ident() + self.count += 1 return True msecs = full_msecs @@ -301,6 +304,8 @@ # handle result if res != rwin32.WAIT_TIMEOUT: + self.last_tid = rthread.get_ident() + self.count += 1 return True return False @@ -379,8 +384,9 @@ elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT): return False raise - _check_signals(space) - + _check_signals(space) + self.last_tid = rthread.get_ident() + self.count += 1 return True finally: if deadline: @@ -449,6 +455,7 @@ self.count = 0 self.maxvalue = maxvalue self.register_finalizer(space) + self.last_tid = -1 self.name = name def name_get(self, space): @@ -495,15 +502,15 @@ if self.kind == RECURSIVE_MUTEX and self._ismine(): self.count += 1 return space.w_True - try: + # sets self.last_tid and increments self.count + # those steps need to be as close as possible to + # acquiring the semlock for self._ismine() to support + # multiple threads got = semlock_acquire(self, space, block, w_timeout) except OSError as e: raise wrap_oserror(space, e) - if got: - self.last_tid = rthread.get_ident() - self.count += 1 return space.w_True else: return space.w_False @@ -520,10 +527,10 @@ try: semlock_release(self, space) + self.count -= 1 except OSError as e: raise wrap_oserror(space, e) - self.count -= 1 def after_fork(self): self.count = 0 diff --git a/pypy/module/_multiprocessing/test/test_semaphore.py b/pypy/module/_multiprocessing/test/test_semaphore.py --- a/pypy/module/_multiprocessing/test/test_semaphore.py +++ b/pypy/module/_multiprocessing/test/test_semaphore.py @@ -18,6 +18,7 @@ def setup_class(cls): cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE) cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX) + cls.w_runappdirect = cls.space.wrap(cls.runappdirect) @py.test.mark.skipif("sys.platform == 'win32'") def test_sem_unlink(self): @@ -138,3 +139,25 @@ from _multiprocessing import SemLock sem = SemLock(self.SEMAPHORE, 1, 1, '/mp-123', unlink=True) assert sem._count() == 0 + + def test_in_threads(self): + from _multiprocessing import SemLock + from threading import Thread + from time import sleep + l = SemLock(0, 1, 1) + if self.runappdirect: + def f(id): + for i in range(10000): + pass + else: + def f(id): + for i in range(1000): + # reduce the probability of thread switching + # at exactly the wrong time in semlock_acquire + for j in range(10): + pass + threads = [Thread(None, f, args=(i,)) for i in range(2)] + [t.start() for t in threads] + # if the RLock calls to sem_wait and sem_post do not match, + # one of the threads will block and the call to join will fail + [t.join() for t in threads] _______________________________________________ pypy-commit mailing list pypy-commit@python.org https://mail.python.org/mailman/listinfo/pypy-commit