Author: Amaury Forgeot d'Arc <[email protected]>
Branch: 
Changeset: r60580:053753bebf0c
Date: 2013-01-28 09:14 +0100
http://bitbucket.org/pypy/pypy/changeset/053753bebf0c/

Log:    Install the py3k version of C thread helpers. Also implements
        Rlock.acquire_timed()

diff --git a/rpython/rlib/rthread.py b/rpython/rlib/rthread.py
--- a/rpython/rlib/rthread.py
+++ b/rpython/rlib/rthread.py
@@ -19,9 +19,9 @@
     separate_module_files = [translator_c_dir / 'src' / 'thread.c'],
     include_dirs = [translator_c_dir],
     export_symbols = ['RPyThreadGetIdent', 'RPyThreadLockInit',
-                      'RPyThreadAcquireLock', 'RPyThreadReleaseLock',
-                      'RPyGilAllocate', 'RPyGilYieldThread',
-                      'RPyGilRelease', 'RPyGilAcquire',
+                      'RPyThreadAcquireLock', 'RPyThreadAcquireLockTimed',
+                      'RPyThreadReleaseLock', 'RPyGilAllocate',
+                      'RPyGilYieldThread', 'RPyGilRelease', 'RPyGilAcquire',
                       'RPyThreadGetStackSize', 'RPyThreadSetStackSize',
                       'RPyOpaqueDealloc_ThreadLock',
                       'RPyThreadAfterFork']
@@ -61,6 +61,10 @@
 c_thread_acquirelock = llexternal('RPyThreadAcquireLock', [TLOCKP, rffi.INT],
                                   rffi.INT,
                                   threadsafe=True)    # release the GIL
+c_thread_acquirelock_timed = llexternal('RPyThreadAcquireLockTimed', 
+                                        [TLOCKP, rffi.LONGLONG, rffi.INT],
+                                        rffi.INT,
+                                        threadsafe=True)    # release the GIL
 c_thread_releaselock = llexternal('RPyThreadReleaseLock', [TLOCKP], 
lltype.Void,
                                   threadsafe=True)    # release the GIL
 
@@ -121,6 +125,12 @@
         res = rffi.cast(lltype.Signed, res)
         return bool(res)
 
+    def acquire_timed(self, timeout):
+        "timeout is in microseconds."
+        res = c_thread_acquirelock_timed(self._lock, timeout, 1)
+        res = rffi.cast(lltype.Signed, res)
+        return bool(res)
+
     def release(self):
         # Sanity check: the lock must be locked
         if self.acquire(False):
diff --git a/rpython/rlib/test/test_rthread.py 
b/rpython/rlib/test/test_rthread.py
--- a/rpython/rlib/test/test_rthread.py
+++ b/rpython/rlib/test/test_rthread.py
@@ -153,6 +153,23 @@
         answers = fn()
         assert answers == expected
 
+    def test_acquire_timed(self):
+        import time
+        def f():
+            l = allocate_lock()
+            l.acquire(True)
+            t1 = time.time()
+            ok = l.acquire_timed(1000000)
+            t2 = time.time()
+            delay = t2 - t1
+            if ok:
+                return delay
+            else:
+                return -delay
+        fn = self.getcompiled(f, [])
+        res = fn()
+        assert res < -1.0
+
 #class TestRunDirectly(AbstractThreadTests):
 #    def getcompiled(self, f, argtypes):
 #        return f
diff --git a/rpython/translator/c/src/thread.h 
b/rpython/translator/c/src/thread.h
--- a/rpython/translator/c/src/thread.h
+++ b/rpython/translator/c/src/thread.h
@@ -2,6 +2,14 @@
 #define __PYPY_THREAD_H
 #include <assert.h>
 
+#define RPY_TIMEOUT_T long long
+
+typedef enum RPyLockStatus {
+    RPY_LOCK_FAILURE = 0,
+    RPY_LOCK_ACQUIRED = 1,
+    RPY_LOCK_INTR
+} RPyLockStatus;
+
 #ifdef _WIN32
 #include "thread_nt.h"
 #else
diff --git a/rpython/translator/c/src/thread_nt.c 
b/rpython/translator/c/src/thread_nt.c
--- a/rpython/translator/c/src/thread_nt.c
+++ b/rpython/translator/c/src/thread_nt.c
@@ -102,47 +102,24 @@
 
 BOOL InitializeNonRecursiveMutex(PNRMUTEX mutex)
 {
-       mutex->owned = -1 ;  /* No threads have entered NonRecursiveMutex */
-       mutex->thread_id = 0 ;
-       mutex->hevent = CreateEvent(NULL, FALSE, FALSE, NULL) ;
-       return mutex->hevent != NULL ;  /* TRUE if the mutex is created */
+    mutex->sem = CreateSemaphore(NULL, 1, 1, NULL);
 }
 
 VOID DeleteNonRecursiveMutex(PNRMUTEX mutex)
 {
-       /* No in-use check */
-       CloseHandle(mutex->hevent) ;
-       mutex->hevent = NULL ; /* Just in case */
+    /* No in-use check */
+    CloseHandle(mutex->sem);
+    mutex->sem = NULL ; /* Just in case */
 }
 
 DWORD EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait)
 {
-       /* Assume that the thread waits successfully */
-       DWORD ret ;
-
-       /* InterlockedIncrement(&mutex->owned) == 0 means that no thread 
currently owns the mutex */
-       if (!wait)
-       {
-               if (InterlockedCompareExchange(&mutex->owned, 0, -1) != -1)
-                       return WAIT_TIMEOUT ;
-               ret = WAIT_OBJECT_0 ;
-       }
-       else
-               ret = InterlockedIncrement(&mutex->owned) ?
-                       /* Some thread owns the mutex, let's wait... */
-                       WaitForSingleObject(mutex->hevent, INFINITE) : 
WAIT_OBJECT_0 ;
-
-       mutex->thread_id = GetCurrentThreadId() ; /* We own it */
-       return ret ;
+    return WaitForSingleObject(mutex->sem, milliseconds);
 }
 
 BOOL LeaveNonRecursiveMutex(PNRMUTEX mutex)
 {
-       /* We don't own the mutex */
-       mutex->thread_id = 0 ;
-       return
-               InterlockedDecrement(&mutex->owned) < 0 ||
-               SetEvent(mutex->hevent) ; /* Other threads are waiting, wake 
one on them up */
+    return ReleaseSemaphore(mutex->sem, 1, NULL);
 }
 
 /************************************************************/
@@ -158,8 +135,8 @@
 
 void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock)
 {
-       if (lock->hevent != NULL)
-               DeleteNonRecursiveMutex(lock);
+    if (lock->sem != NULL)
+       DeleteNonRecursiveMutex(lock);
 }
 
 /*
@@ -168,9 +145,40 @@
  * and 0 if the lock was not acquired. This means a 0 is returned
  * if the lock has already been acquired by this thread!
  */
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+                         RPY_TIMEOUT_T microseconds, int intr_flag)
+{
+    /* Fow now, intr_flag does nothing on Windows, and lock acquires are
+     * uninterruptible.  */
+    PyLockStatus success;
+    PY_TIMEOUT_T milliseconds;
+
+    if (microseconds >= 0) {
+        milliseconds = microseconds / 1000;
+        if (microseconds % 1000 > 0)
+            ++milliseconds;
+        if ((DWORD) milliseconds != milliseconds)
+            Py_FatalError("Timeout too large for a DWORD, "
+                           "please check PY_TIMEOUT_MAX");
+    }
+    else
+        milliseconds = INFINITE;
+
+    if (lock && EnterNonRecursiveMutex(
+           lock, (DWORD)milliseconds) == WAIT_OBJECT_0) {
+        success = PY_LOCK_ACQUIRED;
+    }
+    else {
+        success = PY_LOCK_FAILURE;
+    }
+
+    return success;
+}
+
 int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
 {
-       return EnterNonRecursiveMutex(lock, (waitflag != 0 ? INFINITE : 0)) == 
WAIT_OBJECT_0;
+    return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0);
 }
 
 void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock)
diff --git a/rpython/translator/c/src/thread_nt.h 
b/rpython/translator/c/src/thread_nt.h
--- a/rpython/translator/c/src/thread_nt.h
+++ b/rpython/translator/c/src/thread_nt.h
@@ -5,16 +5,16 @@
  */
 
 typedef struct RPyOpaque_ThreadLock {
-       LONG   owned ;
-       DWORD  thread_id ;
-       HANDLE hevent ;
-};
+    HANDLE sem;
+} NRMUTEX, *PNRMUTEX;
 
 /* prototypes */
 long RPyThreadStart(void (*func)(void));
 int RPyThreadLockInit(struct RPyOpaque_ThreadLock *lock);
 void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock);
 int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag);
+RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+                                       RPY_TIMEOUT_T timeout, int intr_flag);
 void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock);
 long RPyThreadGetStackSize(void);
 long RPyThreadSetStackSize(long);
diff --git a/rpython/translator/c/src/thread_pthread.c 
b/rpython/translator/c/src/thread_pthread.c
--- a/rpython/translator/c/src/thread_pthread.c
+++ b/rpython/translator/c/src/thread_pthread.c
@@ -8,6 +8,7 @@
 #include <stdio.h>
 #include <errno.h>
 #include <assert.h>
+#include <sys/time.h>
 
 /* The following is hopefully equivalent to what CPython does
    (which is trying to compile a snippet of code using it) */
@@ -170,6 +171,29 @@
 #endif
 }
 
+#ifdef GETTIMEOFDAY_NO_TZ
+#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv)
+#else
+#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv, (struct timezone *)NULL)
+#endif
+
+#define RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts) \
+do { \
+    struct timeval tv; \
+    RPY_GETTIMEOFDAY(&tv); \
+    tv.tv_usec += microseconds % 1000000; \
+    tv.tv_sec += microseconds / 1000000; \
+    tv.tv_sec += tv.tv_usec / 1000000; \
+    tv.tv_usec %= 1000000; \
+    ts.tv_sec = tv.tv_sec; \
+    ts.tv_nsec = tv.tv_usec * 1000; \
+} while(0)
+
+int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+{
+    return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0);
+}
+
 /************************************************************/
 #ifdef USE_SEMAPHORES
 /************************************************************/
@@ -215,26 +239,50 @@
        return (status == -1) ? errno : status;
 }
 
-int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+                         RPY_TIMEOUT_T microseconds, int intr_flag)
 {
-       int success;
+       RPyLockStatus success;
        sem_t *thelock = &lock->sem;
        int status, error = 0;
+       struct timespec ts;
 
+       if (microseconds > 0)
+               RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts);
        do {
-               if (waitflag)
-                       status = rpythread_fix_status(sem_wait(thelock));
-               else
-                       status = rpythread_fix_status(sem_trywait(thelock));
-       } while (status == EINTR); /* Retry if interrupted by a signal */
+           if (microseconds > 0)
+               status = rpythread_fix_status(sem_timedwait(thelock, &ts));
+           else if (microseconds == 0)
+               status = rpythread_fix_status(sem_trywait(thelock));
+           else
+               status = rpythread_fix_status(sem_wait(thelock));
+           /* Retry if interrupted by a signal, unless the caller wants to be
+              notified.  */
+       } while (!intr_flag && status == EINTR);
 
-       if (waitflag) {
+       /* Don't check the status if we're stopping because of an interrupt.  */
+       if (!(intr_flag && status == EINTR)) {
+           if (microseconds > 0) {
+               if (status != ETIMEDOUT)
+                   CHECK_STATUS("sem_timedwait");
+           }
+           else if (microseconds == 0) {
+               if (status != EAGAIN)
+                   CHECK_STATUS("sem_trywait");
+           }
+           else {
                CHECK_STATUS("sem_wait");
-       } else if (status != EAGAIN) {
-               CHECK_STATUS("sem_trywait");
+           }
        }
-       
-       success = (status == 0) ? 1 : 0;
+
+       if (status == 0) {
+           success = RPY_LOCK_ACQUIRED;
+       } else if (intr_flag && status == EINTR) {
+           success = RPY_LOCK_INTR;
+       } else {
+           success = RPY_LOCK_FAILURE;
+       }
        return success;
 }
 
@@ -326,32 +374,63 @@
        }
 }
 
-int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag)
+RPyLockStatus
+RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+                         RPY_TIMEOUT_T microseconds, int intr_flag)
 {
-       int success;
+       RPyLockStatus success;
        int status, error = 0;
 
        status = pthread_mutex_lock( &lock->mut );
        CHECK_STATUS("pthread_mutex_lock[1]");
-       success = lock->locked == 0;
 
-       if ( !success && waitflag ) {
+       if (lock->locked == 0) {
+           success = RPY_LOCK_ACQUIRED;
+       } else if (microseconds == 0) {
+           success = RPY_LOCK_FAILURE;
+       } else {
+               struct timespec ts;
+               if (microseconds > 0)
+                   RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts);
                /* continue trying until we get the lock */
 
                /* mut must be locked by me -- part of the condition
                 * protocol */
-               while ( lock->locked ) {
-                       status = pthread_cond_wait(&lock->lock_released,
-                                                  &lock->mut);
+               success = RPY_LOCK_FAILURE;
+               while (success == RPY_LOCK_FAILURE) {
+                   if (microseconds > 0) {
+                       status = pthread_cond_timedwait(
+                           &lock->lock_released,
+                           &lock->mut, &ts);
+                       if (status == ETIMEDOUT)
+                           break;
+                       CHECK_STATUS("pthread_cond_timed_wait");
+                   }
+                   else {
+                       status = pthread_cond_wait(
+                           &lock->lock_released,
+                           &lock->mut);
                        CHECK_STATUS("pthread_cond_wait");
+                   }
+
+                   if (intr_flag && status == 0 && lock->locked) {
+                       /* We were woken up, but didn't get the lock.  We 
probably received
+                        * a signal.  Return RPY_LOCK_INTR to allow the caller 
to handle
+                        * it and retry.  */
+                       success = RPY_LOCK_INTR;
+                       break;
+                   } else if (status == 0 && !lock->locked) {
+                       success = RPY_LOCK_ACQUIRED;
+                   } else {
+                       success = RPY_LOCK_FAILURE;
+                   }
                }
-               success = 1;
        }
-       if (success) lock->locked = 1;
+       if (success == RPY_LOCK_ACQUIRED) lock->locked = 1;
        status = pthread_mutex_unlock( &lock->mut );
        CHECK_STATUS("pthread_mutex_unlock[1]");
 
-       if (error) success = 0;
+       if (error) success = RPY_LOCK_FAILURE;
        return success;
 }
 
diff --git a/rpython/translator/c/src/thread_pthread.h 
b/rpython/translator/c/src/thread_pthread.h
--- a/rpython/translator/c/src/thread_pthread.h
+++ b/rpython/translator/c/src/thread_pthread.h
@@ -64,6 +64,8 @@
 int RPyThreadLockInit(struct RPyOpaque_ThreadLock *lock);
 void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock);
 int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag);
+RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock,
+                                       RPY_TIMEOUT_T timeout, int intr_flag);
 void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock);
 long RPyThreadGetStackSize(void);
 long RPyThreadSetStackSize(long);
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to