Author: Matti Picus <[email protected]>
Branch: py3.6
Changeset: r96567:777ad5f524c1
Date: 2019-04-30 09:34 -0400
http://bitbucket.org/pypy/pypy/changeset/777ad5f524c1/
Log: merge default into py3.6
diff --git a/pypy/doc/whatsnew-head.rst b/pypy/doc/whatsnew-head.rst
--- a/pypy/doc/whatsnew-head.rst
+++ b/pypy/doc/whatsnew-head.rst
@@ -20,3 +20,8 @@
.. branch: issue2968
Fix segfault in cpyext_tp_new_tupl
+
+.. branch: semlock-deadlock
+
+Test and reduce the probability of a deadlock when acquiring a semaphore by
+moving global state changes closer to the actual aquire.
diff --git a/pypy/module/_multiprocessing/interp_semaphore.py
b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -49,7 +49,8 @@
eci = ExternalCompilationInfo(
includes = ['sys/time.h',
'limits.h',
- 'semaphore.h'],
+ 'semaphore.h',
+ ],
libraries = libraries,
)
@@ -269,6 +270,8 @@
res = rwin32.WaitForSingleObject(self.handle, 0)
if res != rwin32.WAIT_TIMEOUT:
+ self.last_tid = rthread.get_ident()
+ self.count += 1
return True
msecs = full_msecs
@@ -301,6 +304,8 @@
# handle result
if res != rwin32.WAIT_TIMEOUT:
+ self.last_tid = rthread.get_ident()
+ self.count += 1
return True
return False
@@ -379,8 +384,9 @@
elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT):
return False
raise
- _check_signals(space)
-
+ _check_signals(space)
+ self.last_tid = rthread.get_ident()
+ self.count += 1
return True
finally:
if deadline:
@@ -449,6 +455,7 @@
self.count = 0
self.maxvalue = maxvalue
self.register_finalizer(space)
+ self.last_tid = -1
self.name = name
def name_get(self, space):
@@ -495,15 +502,15 @@
if self.kind == RECURSIVE_MUTEX and self._ismine():
self.count += 1
return space.w_True
-
try:
+ # sets self.last_tid and increments self.count
+ # those steps need to be as close as possible to
+ # acquiring the semlock for self._ismine() to support
+ # multiple threads
got = semlock_acquire(self, space, block, w_timeout)
except OSError as e:
raise wrap_oserror(space, e)
-
if got:
- self.last_tid = rthread.get_ident()
- self.count += 1
return space.w_True
else:
return space.w_False
@@ -520,10 +527,10 @@
try:
semlock_release(self, space)
+ self.count -= 1
except OSError as e:
raise wrap_oserror(space, e)
- self.count -= 1
def after_fork(self):
self.count = 0
diff --git a/pypy/module/_multiprocessing/test/test_semaphore.py
b/pypy/module/_multiprocessing/test/test_semaphore.py
--- a/pypy/module/_multiprocessing/test/test_semaphore.py
+++ b/pypy/module/_multiprocessing/test/test_semaphore.py
@@ -18,6 +18,7 @@
def setup_class(cls):
cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
+ cls.w_runappdirect = cls.space.wrap(cls.runappdirect)
@py.test.mark.skipif("sys.platform == 'win32'")
def test_sem_unlink(self):
@@ -138,3 +139,25 @@
from _multiprocessing import SemLock
sem = SemLock(self.SEMAPHORE, 1, 1, '/mp-123', unlink=True)
assert sem._count() == 0
+
+ def test_in_threads(self):
+ from _multiprocessing import SemLock
+ from threading import Thread
+ from time import sleep
+ l = SemLock(0, 1, 1)
+ if self.runappdirect:
+ def f(id):
+ for i in range(10000):
+ pass
+ else:
+ def f(id):
+ for i in range(1000):
+ # reduce the probability of thread switching
+ # at exactly the wrong time in semlock_acquire
+ for j in range(10):
+ pass
+ threads = [Thread(None, f, args=(i,)) for i in range(2)]
+ [t.start() for t in threads]
+ # if the RLock calls to sem_wait and sem_post do not match,
+ # one of the threads will block and the call to join will fail
+ [t.join() for t in threads]
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit