Author: Matti Picus <[email protected]>
Branch: semlock-deadlock
Changeset: r96545:f3621100820d
Date: 2019-04-27 09:18 -0700
http://bitbucket.org/pypy/pypy/changeset/f3621100820d/

Log:    fix RECURSIVE semaphore by setting state before, not after, glibc
        call

diff --git a/pypy/module/_multiprocessing/interp_semaphore.py 
b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -46,7 +46,8 @@
     eci = ExternalCompilationInfo(
         includes = ['sys/time.h',
                     'limits.h',
-                    'semaphore.h'],
+                    'semaphore.h',
+                    ],
         libraries = libraries,
         )
 
@@ -333,15 +334,11 @@
         _sem_close_no_errno(handle)
 
     def semlock_acquire(self, space, block, w_timeout):
-        import threading
-        myid = threading.current_thread().ident
         if not block:
-            print 'not expected'
             deadline = lltype.nullptr(TIMESPECP.TO)
         elif space.is_none(w_timeout):
             deadline = lltype.nullptr(TIMESPECP.TO)
         else:
-            print 'not expected'
             timeout = space.float_w(w_timeout)
             sec = int(timeout)
             nsec = int(1e9 * (timeout - sec) + 0.5)
@@ -360,16 +357,12 @@
             while True:
                 try:
                     if not block:
-                        print 'not expected'
                         sem_trywait(self.handle)
                     elif not deadline:
-                        print 'sem_wait', myid
                         sem_wait(self.handle)
                     else:
-                        print 'not expected'
                         sem_timedwait(self.handle, deadline)
                 except OSError as e:
-                    print str(e)
                     if e.errno == errno.EINTR:
                         # again
                         _check_signals(space)
@@ -377,9 +370,7 @@
                     elif e.errno in (errno.EAGAIN, errno.ETIMEDOUT):
                         return False
                     raise
-                print 4, myid
-                _check_signals(space)
-                print 5, myid
+                _check_signals(space)    
                 return True
         finally:
             if deadline:
@@ -387,9 +378,6 @@
 
 
     def semlock_release(self, space):
-        import threading
-        myid = threading.current_thread().ident
-        print 'semlock_release', myid
         if self.kind == RECURSIVE_MUTEX:
             sem_post(self.handle)
             return
@@ -451,6 +439,7 @@
         self.count = 0
         self.maxvalue = maxvalue
         self.register_finalizer(space)
+        self.last_tid = -1
 
     def kind_get(self, space):
         return space.newint(self.kind)
@@ -488,35 +477,42 @@
         if self.kind == RECURSIVE_MUTEX and self._ismine():
             self.count += 1
             return space.w_True
-
+        owner_id = self.last_tid
         try:
+            # Ideally these calls would be atomic. Since we cannot
+            # promise that, make sure we do not enable the fast-path
+            # through release anymore
+            self.last_tid = rthread.get_ident()
             got = semlock_acquire(self, space, block, w_timeout)
         except OSError as e:
+            self.last_tid = owner_id
             raise wrap_oserror(space, e)
 
         if got:
-            self.last_tid = rthread.get_ident()
             self.count += 1
             return space.w_True
         else:
+            self.last_tid = owner_id
             return space.w_False
 
     def release(self, space):
         if self.kind == RECURSIVE_MUTEX:
-            if not self._ismine():
-                raise oefmt(space.w_AssertionError,
-                            "attempt to release recursive lock not owned by "
-                            "thread")
-            if self.count > 1:
+            # another thread may be in the middle of an acquire and
+            # have set the last_tid already
+            if self._ismine() and self.count > 1:
                 self.count -= 1
                 return
 
         try:
+            # Ideally these two calls would be atomic. Since we cannot
+            # promise that, make sure once we release the lock the count
+            # will be correct
+            self.count -= 1
             semlock_release(self, space)
         except OSError as e:
+            self.count += 1
             raise wrap_oserror(space, e)
 
-        self.count -= 1
 
     def after_fork(self):
         self.count = 0
diff --git a/pypy/module/_multiprocessing/test/test_semaphore.py 
b/pypy/module/_multiprocessing/test/test_semaphore.py
--- a/pypy/module/_multiprocessing/test/test_semaphore.py
+++ b/pypy/module/_multiprocessing/test/test_semaphore.py
@@ -112,11 +112,14 @@
     def test_in_threads(self):
         from _multiprocessing import SemLock
         from threading import Thread
+        from time import sleep
         l = SemLock(0, 1, 1)
-        def f():
-            for i in range(10000):
+        def f(id):
+            for i in range(1000):
                 with l:
                     pass
-        threads = [Thread(None, f) for i in range(2)]
+        threads = [Thread(None, f, args=(i,)) for i in range(2)]
         [t.start() for t in threads]
+        # if the RLock calls to sem_wait and sem_post do not match,
+        # one of the threads will block and the call to join will fail
         [t.join() for t in threads]
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to