Author: Philip Jenvey <[email protected]>
Branch: py3k
Changeset: r62324:a66a94873880
Date: 2013-03-12 16:03 -0700
http://bitbucket.org/pypy/pypy/changeset/a66a94873880/

Log:    have lock acquire retry on interrupts

diff --git a/pypy/module/thread/os_lock.py b/pypy/module/thread/os_lock.py
--- a/pypy/module/thread/os_lock.py
+++ b/pypy/module/thread/os_lock.py
@@ -2,6 +2,7 @@
 Python locks, based on true threading locks provided by the OS.
 """
 
+import time
 from rpython.rlib import rthread
 from pypy.module.thread.error import wrap_thread_error
 from pypy.interpreter.baseobjspace import Wrappable
@@ -16,6 +17,7 @@
 LONGLONG_MAX = r_longlong(2 ** (r_longlong.BITS-1) - 1)
 TIMEOUT_MAX = LONGLONG_MAX
 
+RPY_LOCK_FAILURE, RPY_LOCK_ACQUIRED, RPY_LOCK_INTR = range(3)
 
 ##import sys
 ##def debug(msg, n):
@@ -51,6 +53,23 @@
     return microseconds
 
 
+def acquire_timed(lock, microseconds):
+    """Helper to acquire an interruptible lock with a timeout."""
+    endtime = (time.time() * 1000000) + microseconds
+    while True:
+        result = lock.acquire_timed(microseconds)
+        if result == RPY_LOCK_INTR:
+            if microseconds >= 0:
+                microseconds = r_longlong(endtime - (time.time() * 1000000))
+                # Check for negative values, since those mean block
+                # forever
+                if microseconds <= 0:
+                    result = RPY_LOCK_FAILURE
+        if result != RPY_LOCK_INTR:
+            break
+    return result
+
+
 class Lock(Wrappable):
     "A wrappable box around an interp-level lock object."
 
@@ -70,10 +89,8 @@
 and the return value reflects whether the lock is acquired.
 The blocking operation is interruptible."""
         microseconds = parse_acquire_args(space, blocking, timeout)
-        mylock = self.lock
-        result = mylock.acquire_timed(microseconds)
-        result = (result == 1)    # XXX handle RPY_LOCK_INTR (see e80549fefb75)
-        return space.newbool(result)
+        result = acquire_timed(self.lock, microseconds)
+        return space.newbool(result == RPY_LOCK_ACQUIRED)
 
     def descr_lock_release(self, space):
         """Release the lock, allowing another thread that is blocked waiting 
for
@@ -184,8 +201,8 @@
         if self.rlock_count > 0 or not self.lock.acquire(False):
             if not blocking:
                 return space.w_False
-            r = self.lock.acquire_timed(microseconds)
-            r = (r == 1)    # XXX handle RPY_LOCK_INTR (see e80549fefb75)
+            r = acquire_timed(self.lock, microseconds)
+            r = (r == RPY_LOCK_ACQUIRED)
         if r:
             assert self.rlock_count == 0
             self.rlock_owner = tid
diff --git a/pypy/module/thread/test/test_lock.py 
b/pypy/module/thread/test/test_lock.py
--- a/pypy/module/thread/test/test_lock.py
+++ b/pypy/module/thread/test/test_lock.py
@@ -145,3 +145,44 @@
         assert lock.acquire() is True
         assert lock.acquire(False) is True
         assert lock.acquire(True, timeout=.1) is True
+
+
+class AppTestLockSignals(GenericTestThread):
+
+    def w_acquire_retries_on_intr(self, lock):
+        import _thread, os, signal, time
+        self.sig_recvd = False
+        def my_handler(signal, frame):
+            self.sig_recvd = True
+        old_handler = signal.signal(signal.SIGUSR1, my_handler)
+        try:
+            def other_thread():
+                # Acquire the lock in a non-main thread, so this test works for
+                # RLocks.
+                lock.acquire()
+                # Wait until the main thread is blocked in the lock acquire, 
and
+                # then wake it up with this.
+                time.sleep(0.5)
+                os.kill(os.getpid(), signal.SIGUSR1)
+                # Let the main thread take the interrupt, handle it, and retry
+                # the lock acquisition.  Then we'll let it run.
+                time.sleep(0.5)
+                lock.release()
+            _thread.start_new_thread(other_thread, ())
+            # Wait until we can't acquire it without blocking...
+            while lock.acquire(blocking=False):
+                lock.release()
+                time.sleep(0.01)
+            result = lock.acquire()  # Block while we receive a signal.
+            assert self.sig_recvd
+            assert result
+        finally:
+            signal.signal(signal.SIGUSR1, old_handler)
+
+    def test_lock_acquire_retries_on_intr(self):
+        import _thread
+        self.acquire_retries_on_intr(_thread.allocate_lock())
+
+    def test_rlock_acquire_retries_on_intr(self):
+        import _thread
+        self.acquire_retries_on_intr(_thread.RLock())
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to