Author: Matti Picus <[email protected]>
Branch: semlock-deadlock
Changeset: r96546:3f24275dc38b
Date: 2019-04-28 08:50 -0700
http://bitbucket.org/pypy/pypy/changeset/3f24275dc38b/

Log:    revert f3621100820d in favor of moving state changes closer to
        sem_wait calls

diff --git a/pypy/module/_multiprocessing/interp_semaphore.py 
b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -260,6 +260,8 @@
         res = rwin32.WaitForSingleObject(self.handle, 0)
 
         if res != rwin32.WAIT_TIMEOUT:
+            self.last_tid = rthread.get_ident()
+            self.count += 1
             return True
 
         msecs = full_msecs
@@ -292,6 +294,8 @@
 
         # handle result
         if res != rwin32.WAIT_TIMEOUT:
+            self.last_tid = rthread.get_ident()
+            self.count += 1
             return True
         return False
 
@@ -371,6 +375,8 @@
                         return False
                     raise
                 _check_signals(space)    
+                self.last_tid = rthread.get_ident()
+                self.count += 1
                 return True
         finally:
             if deadline:
@@ -477,40 +483,33 @@
         if self.kind == RECURSIVE_MUTEX and self._ismine():
             self.count += 1
             return space.w_True
-        owner_id = self.last_tid
         try:
-            # Ideally these calls would be atomic. Since we cannot
-            # promise that, make sure we do not enable the fast-path
-            # through release anymore
-            self.last_tid = rthread.get_ident()
+            # sets self.last_tid and increments self.count
+            # those steps need to be as close as possible to
+            # acquiring the semlock for self._ismine() to support
+            # multiple threads 
             got = semlock_acquire(self, space, block, w_timeout)
         except OSError as e:
-            self.last_tid = owner_id
             raise wrap_oserror(space, e)
-
         if got:
-            self.count += 1
             return space.w_True
         else:
-            self.last_tid = owner_id
             return space.w_False
 
     def release(self, space):
         if self.kind == RECURSIVE_MUTEX:
-            # another thread may be in the middle of an acquire and
-            # have set the last_tid already
-            if self._ismine() and self.count > 1:
+            if not self._ismine():
+                raise oefmt(space.w_AssertionError,
+                            "attempt to release recursive lock not owned by "
+                            "thread")
+            if self.count > 1:
                 self.count -= 1
                 return
 
         try:
-            # Ideally these two calls would be atomic. Since we cannot
-            # promise that, make sure once we release the lock the count
-            # will be correct
+            semlock_release(self, space)
             self.count -= 1
-            semlock_release(self, space)
         except OSError as e:
-            self.count += 1
             raise wrap_oserror(space, e)
 
 
diff --git a/pypy/module/_multiprocessing/test/test_semaphore.py 
b/pypy/module/_multiprocessing/test/test_semaphore.py
--- a/pypy/module/_multiprocessing/test/test_semaphore.py
+++ b/pypy/module/_multiprocessing/test/test_semaphore.py
@@ -17,6 +17,7 @@
     def setup_class(cls):
         cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
         cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
+        cls.w_runappdirect = cls.space.wrap(cls.runappdirect)
 
     def test_semaphore(self):
         from _multiprocessing import SemLock
@@ -114,10 +115,17 @@
         from threading import Thread
         from time import sleep
         l = SemLock(0, 1, 1)
-        def f(id):
-            for i in range(1000):
-                with l:
+        if self.runappdirect:
+            def f(id):
+                for i in range(10000):
                     pass
+        else:
+            def f(id):
+                for i in range(1000):
+                    # reduce the probability of thread switching
+                    # at exactly the wrong time in semlock_acquire
+                    for j in range(10):
+                        pass
         threads = [Thread(None, f, args=(i,)) for i in range(2)]
         [t.start() for t in threads]
         # if the RLock calls to sem_wait and sem_post do not match,
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to