Author: Matti Picus <matti.pi...@gmail.com>
Branch: py3.6
Changeset: r96567:777ad5f524c1
Date: 2019-04-30 09:34 -0400
http://bitbucket.org/pypy/pypy/changeset/777ad5f524c1/

Log:    merge default into py3.6

diff --git a/pypy/doc/whatsnew-head.rst b/pypy/doc/whatsnew-head.rst
--- a/pypy/doc/whatsnew-head.rst
+++ b/pypy/doc/whatsnew-head.rst
@@ -20,3 +20,8 @@
 .. branch: issue2968
 
 Fix segfault in cpyext_tp_new_tupl
+
+.. branch: semlock-deadlock
+
+Test and reduce the probability of a deadlock when acquiring a semaphore by
+moving global state changes closer to the actual aquire.
diff --git a/pypy/module/_multiprocessing/interp_semaphore.py 
b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -49,7 +49,8 @@
     eci = ExternalCompilationInfo(
         includes = ['sys/time.h',
                     'limits.h',
-                    'semaphore.h'],
+                    'semaphore.h',
+                    ],
         libraries = libraries,
         )
 
@@ -269,6 +270,8 @@
         res = rwin32.WaitForSingleObject(self.handle, 0)
 
         if res != rwin32.WAIT_TIMEOUT:
+            self.last_tid = rthread.get_ident()
+            self.count += 1
             return True
 
         msecs = full_msecs
@@ -301,6 +304,8 @@
 
         # handle result
         if res != rwin32.WAIT_TIMEOUT:
+            self.last_tid = rthread.get_ident()
+            self.count += 1
             return True
         return False
 
@@ -379,8 +384,9 @@
                     elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT):
                         return False
                     raise
-                _check_signals(space)
-
+                _check_signals(space)    
+                self.last_tid = rthread.get_ident()
+                self.count += 1
                 return True
         finally:
             if deadline:
@@ -449,6 +455,7 @@
         self.count = 0
         self.maxvalue = maxvalue
         self.register_finalizer(space)
+        self.last_tid = -1
         self.name = name
 
     def name_get(self, space):
@@ -495,15 +502,15 @@
         if self.kind == RECURSIVE_MUTEX and self._ismine():
             self.count += 1
             return space.w_True
-
         try:
+            # sets self.last_tid and increments self.count
+            # those steps need to be as close as possible to
+            # acquiring the semlock for self._ismine() to support
+            # multiple threads 
             got = semlock_acquire(self, space, block, w_timeout)
         except OSError as e:
             raise wrap_oserror(space, e)
-
         if got:
-            self.last_tid = rthread.get_ident()
-            self.count += 1
             return space.w_True
         else:
             return space.w_False
@@ -520,10 +527,10 @@
 
         try:
             semlock_release(self, space)
+            self.count -= 1
         except OSError as e:
             raise wrap_oserror(space, e)
 
-        self.count -= 1
 
     def after_fork(self):
         self.count = 0
diff --git a/pypy/module/_multiprocessing/test/test_semaphore.py 
b/pypy/module/_multiprocessing/test/test_semaphore.py
--- a/pypy/module/_multiprocessing/test/test_semaphore.py
+++ b/pypy/module/_multiprocessing/test/test_semaphore.py
@@ -18,6 +18,7 @@
     def setup_class(cls):
         cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
         cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
+        cls.w_runappdirect = cls.space.wrap(cls.runappdirect)
 
     @py.test.mark.skipif("sys.platform == 'win32'")
     def test_sem_unlink(self):
@@ -138,3 +139,25 @@
         from _multiprocessing import SemLock
         sem = SemLock(self.SEMAPHORE, 1, 1, '/mp-123', unlink=True)
         assert sem._count() == 0
+
+    def test_in_threads(self):
+        from _multiprocessing import SemLock
+        from threading import Thread
+        from time import sleep
+        l = SemLock(0, 1, 1)
+        if self.runappdirect:
+            def f(id):
+                for i in range(10000):
+                    pass
+        else:
+            def f(id):
+                for i in range(1000):
+                    # reduce the probability of thread switching
+                    # at exactly the wrong time in semlock_acquire
+                    for j in range(10):
+                        pass
+        threads = [Thread(None, f, args=(i,)) for i in range(2)]
+        [t.start() for t in threads]
+        # if the RLock calls to sem_wait and sem_post do not match,
+        # one of the threads will block and the call to join will fail
+        [t.join() for t in threads]
_______________________________________________
pypy-commit mailing list
pypy-commit@python.org
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to