Author: Ronan Lamy <[email protected]>
Branch: 
Changeset: r97616:263ac72641a2
Date: 2019-09-25 16:37 +0100
http://bitbucket.org/pypy/pypy/changeset/263ac72641a2/

Log:    backport 22433dc6e71cd01669c45a6ef3b2ca7bdd721686: Fix race
        condition with SemLock that would sometimes cause a deadlock when
        using multiprocessing and threads together.

diff --git a/pypy/module/_multiprocessing/interp_semaphore.py 
b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -33,7 +33,7 @@
     _ReleaseSemaphore = rwin32.winexternal(
         'ReleaseSemaphore', [rwin32.HANDLE, rffi.LONG, rffi.LONGP],
         rwin32.BOOL,
-        save_err=rffi.RFFI_SAVE_LASTERROR)
+        save_err=rffi.RFFI_SAVE_LASTERROR, releasegil=True)
 
 else:
     from rpython.rlib import rposix
@@ -98,7 +98,7 @@
                          save_err=rffi.RFFI_SAVE_ERRNO)
     _sem_trywait = external('sem_trywait', [SEM_T], rffi.INT,
                             save_err=rffi.RFFI_SAVE_ERRNO)
-    _sem_post = external('sem_post', [SEM_T], rffi.INT,
+    _sem_post = external('sem_post', [SEM_T], rffi.INT, releasegil=False,
                          save_err=rffi.RFFI_SAVE_ERRNO)
     _sem_getvalue = external('sem_getvalue', [SEM_T, rffi.INTP], rffi.INT,
                              save_err=rffi.RFFI_SAVE_ERRNO)
@@ -374,7 +374,7 @@
                     elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT):
                         return False
                     raise
-                _check_signals(space)    
+                _check_signals(space)
                 self.last_tid = rthread.get_ident()
                 self.count += 1
                 return True
@@ -487,7 +487,7 @@
             # sets self.last_tid and increments self.count
             # those steps need to be as close as possible to
             # acquiring the semlock for self._ismine() to support
-            # multiple threads 
+            # multiple threads
             got = semlock_acquire(self, space, block, w_timeout)
         except OSError as e:
             raise wrap_oserror(space, e)
@@ -507,6 +507,8 @@
                 return
 
         try:
+            # Note: a succesful semlock_release() must not release the GIL,
+            # otherwise there is a race condition on self.count
             semlock_release(self, space)
             self.count -= 1
         except OSError as e:
diff --git a/pypy/module/_multiprocessing/test/test_interp_semaphore.py 
b/pypy/module/_multiprocessing/test/test_interp_semaphore.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/_multiprocessing/test/test_interp_semaphore.py
@@ -0,0 +1,46 @@
+import time
+from rpython.rlib.rgil import yield_thread
+from pypy.tool.pytest.objspace import gettestobjspace
+from pypy.interpreter.gateway import interp2app
+from pypy.module.thread.os_lock import allocate_lock
+from pypy.module.thread.os_thread import start_new_thread
+from pypy.module._multiprocessing.interp_semaphore import (
+    create_semaphore, sem_unlink, W_SemLock)
+
+def test_stuff(space):
+    space = gettestobjspace(usemodules=['_multiprocessing', 'thread'])
+    sem_name = '/test7'
+    _handle = create_semaphore(space, sem_name, 1, 1)
+    w_lock = W_SemLock(space, _handle, 0, 1)
+    created = []
+    successful = []
+    N_THREADS = 16
+
+    def run(space):
+        w_sentinel = allocate_lock(space)
+        yield_thread()
+        w_sentinel.descr_lock_acquire(space)  # releases GIL
+        yield_thread()
+        created.append(w_sentinel)
+        w_got = w_lock.acquire(space, w_timeout=space.newfloat(5.))  # 
releases GIL
+        if space.is_true(w_got):
+            yield_thread()
+            w_lock.release(space)
+            successful.append(w_sentinel)
+        w_sentinel.descr_lock_release(space)
+    w_run = space.wrap(interp2app(run))
+
+    w_lock.acquire(space)
+    for _ in range(N_THREADS):
+        start_new_thread(space, w_run, space.newtuple([]))  # releases GIL
+    deadline = time.time() + 5.
+    while len(created) < N_THREADS:
+        assert time.time() < deadline
+        yield_thread()
+    w_lock.release(space)
+
+    for w_sentinel in created:
+        # Join thread
+        w_sentinel.descr_lock_acquire(space)  # releases GIL
+        w_sentinel.descr_lock_release(space)
+    assert len(successful) == N_THREADS
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to