Author: Ronan Lamy <[email protected]>
Branch:
Changeset: r97616:263ac72641a2
Date: 2019-09-25 16:37 +0100
http://bitbucket.org/pypy/pypy/changeset/263ac72641a2/
Log: backport 22433dc6e71cd01669c45a6ef3b2ca7bdd721686: Fix race
condition with SemLock that would sometimes cause a deadlock when
using multiprocessing and threads together.
diff --git a/pypy/module/_multiprocessing/interp_semaphore.py
b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -33,7 +33,7 @@
_ReleaseSemaphore = rwin32.winexternal(
'ReleaseSemaphore', [rwin32.HANDLE, rffi.LONG, rffi.LONGP],
rwin32.BOOL,
- save_err=rffi.RFFI_SAVE_LASTERROR)
+ save_err=rffi.RFFI_SAVE_LASTERROR, releasegil=True)
else:
from rpython.rlib import rposix
@@ -98,7 +98,7 @@
save_err=rffi.RFFI_SAVE_ERRNO)
_sem_trywait = external('sem_trywait', [SEM_T], rffi.INT,
save_err=rffi.RFFI_SAVE_ERRNO)
- _sem_post = external('sem_post', [SEM_T], rffi.INT,
+ _sem_post = external('sem_post', [SEM_T], rffi.INT, releasegil=False,
save_err=rffi.RFFI_SAVE_ERRNO)
_sem_getvalue = external('sem_getvalue', [SEM_T, rffi.INTP], rffi.INT,
save_err=rffi.RFFI_SAVE_ERRNO)
@@ -374,7 +374,7 @@
elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT):
return False
raise
- _check_signals(space)
+ _check_signals(space)
self.last_tid = rthread.get_ident()
self.count += 1
return True
@@ -487,7 +487,7 @@
# sets self.last_tid and increments self.count
# those steps need to be as close as possible to
# acquiring the semlock for self._ismine() to support
- # multiple threads
+ # multiple threads
got = semlock_acquire(self, space, block, w_timeout)
except OSError as e:
raise wrap_oserror(space, e)
@@ -507,6 +507,8 @@
return
try:
+ # Note: a succesful semlock_release() must not release the GIL,
+ # otherwise there is a race condition on self.count
semlock_release(self, space)
self.count -= 1
except OSError as e:
diff --git a/pypy/module/_multiprocessing/test/test_interp_semaphore.py
b/pypy/module/_multiprocessing/test/test_interp_semaphore.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_multiprocessing/test/test_interp_semaphore.py
@@ -0,0 +1,46 @@
+import time
+from rpython.rlib.rgil import yield_thread
+from pypy.tool.pytest.objspace import gettestobjspace
+from pypy.interpreter.gateway import interp2app
+from pypy.module.thread.os_lock import allocate_lock
+from pypy.module.thread.os_thread import start_new_thread
+from pypy.module._multiprocessing.interp_semaphore import (
+ create_semaphore, sem_unlink, W_SemLock)
+
+def test_stuff(space):
+ space = gettestobjspace(usemodules=['_multiprocessing', 'thread'])
+ sem_name = '/test7'
+ _handle = create_semaphore(space, sem_name, 1, 1)
+ w_lock = W_SemLock(space, _handle, 0, 1)
+ created = []
+ successful = []
+ N_THREADS = 16
+
+ def run(space):
+ w_sentinel = allocate_lock(space)
+ yield_thread()
+ w_sentinel.descr_lock_acquire(space) # releases GIL
+ yield_thread()
+ created.append(w_sentinel)
+ w_got = w_lock.acquire(space, w_timeout=space.newfloat(5.)) #
releases GIL
+ if space.is_true(w_got):
+ yield_thread()
+ w_lock.release(space)
+ successful.append(w_sentinel)
+ w_sentinel.descr_lock_release(space)
+ w_run = space.wrap(interp2app(run))
+
+ w_lock.acquire(space)
+ for _ in range(N_THREADS):
+ start_new_thread(space, w_run, space.newtuple([])) # releases GIL
+ deadline = time.time() + 5.
+ while len(created) < N_THREADS:
+ assert time.time() < deadline
+ yield_thread()
+ w_lock.release(space)
+
+ for w_sentinel in created:
+ # Join thread
+ w_sentinel.descr_lock_acquire(space) # releases GIL
+ w_sentinel.descr_lock_release(space)
+ assert len(successful) == N_THREADS
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit