Author: Amaury Forgeot d'Arc <amaur...@gmail.com> Branch: py3k Changeset: r48348:76b23a4c1483 Date: 2011-10-22 22:02 +0200 http://bitbucket.org/pypy/pypy/changeset/76b23a4c1483/
Log: Implement timeout in thread.Lock.acquire() diff --git a/pypy/module/thread/ll_thread.py b/pypy/module/thread/ll_thread.py --- a/pypy/module/thread/ll_thread.py +++ b/pypy/module/thread/ll_thread.py @@ -58,6 +58,10 @@ c_thread_acquirelock = llexternal('RPyThreadAcquireLock', [TLOCKP, rffi.INT], rffi.INT, threadsafe=True) # release the GIL +c_thread_acquirelock_timed = llexternal('RPyThreadAcquireLockTimed', + [TLOCKP, rffi.LONGLONG, rffi.INT], + rffi.INT, + threadsafe=True) # release the GIL c_thread_releaselock = llexternal('RPyThreadReleaseLock', [TLOCKP], lltype.Void, threadsafe=True) # release the GIL @@ -116,6 +120,11 @@ res = rffi.cast(lltype.Signed, res) return bool(res) + def acquire_timed(self, timeout): + res = c_thread_acquirelock_timed(self._lock, timeout, 1) + res = rffi.cast(lltype.Signed, res) + return bool(res) + def release(self): # Sanity check: the lock must be locked if self.acquire(False): diff --git a/pypy/module/thread/os_lock.py b/pypy/module/thread/os_lock.py --- a/pypy/module/thread/os_lock.py +++ b/pypy/module/thread/os_lock.py @@ -7,6 +7,7 @@ from pypy.interpreter.baseobjspace import Wrappable from pypy.interpreter.gateway import interp2app, unwrap_spec from pypy.interpreter.typedef import TypeDef +from pypy.interpreter.error import OperationError from pypy.rlib.rarithmetic import r_longlong # Force the declaration of the type 'thread.LockType' for RPython @@ -40,16 +41,32 @@ except thread.error: raise wrap_thread_error(space, "out of resources") - @unwrap_spec(waitflag=int) - def descr_lock_acquire(self, space, waitflag=1): + @unwrap_spec(blocking=int, timeout=float) + def descr_lock_acquire(self, space, blocking=1, timeout=-1.0): """Lock the lock. Without argument, this blocks if the lock is already locked (even by the same thread), waiting for another thread to release the lock, and return None once the lock is acquired. With an argument, this will only block if the argument is true, and the return value reflects whether the lock is acquired. -The blocking operation is not interruptible.""" +The blocking operation is interruptible.""" + if not blocking and timeout != -1.0: + raise OperationError(space.w_ValueError, space.wrap( + "can't specify a timeout for a non-blocking call")) + if timeout < 0.0 and timeout != -1.0: + raise OperationError(space.w_ValueError, space.wrap( + "timeout value must be strictly positive")) + if not blocking: + microseconds = 0 + elif timeout == -1.0: + microseconds = -1 + else: + timeout *= 1e6 + if timeout > float(TIMEOUT_MAX): + raise OperationError(space.w_ValueError, space.wrap( + "timeout value is too large")) + microseconds = r_longlong(timeout) mylock = self.lock - result = mylock.acquire(bool(waitflag)) + result = mylock.acquire_timed(microseconds) return space.newbool(result) def descr_lock_release(self, space): diff --git a/pypy/module/thread/test/test_lock.py b/pypy/module/thread/test/test_lock.py --- a/pypy/module/thread/test/test_lock.py +++ b/pypy/module/thread/test/test_lock.py @@ -50,6 +50,10 @@ import _thread assert isinstance(_thread.TIMEOUT_MAX, float) assert _thread.TIMEOUT_MAX > 1000 + lock = _thread.allocate_lock() + assert lock.acquire() is True + assert lock.acquire(False) is False + assert lock.acquire(True, timeout=.1) is False def test_compile_lock(): diff --git a/pypy/translator/c/src/thread.h b/pypy/translator/c/src/thread.h --- a/pypy/translator/c/src/thread.h +++ b/pypy/translator/c/src/thread.h @@ -5,6 +5,14 @@ #define __PYPY_THREAD_H #include <assert.h> +#define RPY_TIMEOUT_T long long + +typedef enum RPyLockStatus { + RPY_LOCK_FAILURE = 0, + RPY_LOCK_ACQUIRED = 1, + RPY_LOCK_INTR +} RPyLockStatus; + #ifdef _WIN32 #include "thread_nt.h" #else diff --git a/pypy/translator/c/src/thread_nt.h b/pypy/translator/c/src/thread_nt.h --- a/pypy/translator/c/src/thread_nt.h +++ b/pypy/translator/c/src/thread_nt.h @@ -22,19 +22,19 @@ } callobj; typedef struct RPyOpaque_ThreadLock { - LONG owned ; - DWORD thread_id ; - HANDLE hevent ; -} NRMUTEX, *PNRMUTEX ; + HANDLE sem; +} NRMUTEX, *PNRMUTEX; /* prototypes */ long RPyThreadStart(void (*func)(void)); BOOL InitializeNonRecursiveMutex(PNRMUTEX mutex); VOID DeleteNonRecursiveMutex(PNRMUTEX mutex); -DWORD EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait); +DWORD EnterNonRecursiveMutex(PNRMUTEX mutex, DWORD milliseconds); BOOL LeaveNonRecursiveMutex(PNRMUTEX mutex); void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock); int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag); +RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock, + RPY_TIMEOUT_T timeout, int intr_flag); void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock); long RPyThreadGetStackSize(void); long RPyThreadSetStackSize(long); @@ -125,47 +125,24 @@ BOOL InitializeNonRecursiveMutex(PNRMUTEX mutex) { - mutex->owned = -1 ; /* No threads have entered NonRecursiveMutex */ - mutex->thread_id = 0 ; - mutex->hevent = CreateEvent(NULL, FALSE, FALSE, NULL) ; - return mutex->hevent != NULL ; /* TRUE if the mutex is created */ + mutex->sem = CreateSemaphore(NULL, 1, 1, NULL); } VOID DeleteNonRecursiveMutex(PNRMUTEX mutex) { - /* No in-use check */ - CloseHandle(mutex->hevent) ; - mutex->hevent = NULL ; /* Just in case */ + /* No in-use check */ + CloseHandle(mutex->sem); + mutex->sem = NULL ; /* Just in case */ } DWORD EnterNonRecursiveMutex(PNRMUTEX mutex, BOOL wait) { - /* Assume that the thread waits successfully */ - DWORD ret ; - - /* InterlockedIncrement(&mutex->owned) == 0 means that no thread currently owns the mutex */ - if (!wait) - { - if (InterlockedCompareExchange(&mutex->owned, 0, -1) != -1) - return WAIT_TIMEOUT ; - ret = WAIT_OBJECT_0 ; - } - else - ret = InterlockedIncrement(&mutex->owned) ? - /* Some thread owns the mutex, let's wait... */ - WaitForSingleObject(mutex->hevent, INFINITE) : WAIT_OBJECT_0 ; - - mutex->thread_id = GetCurrentThreadId() ; /* We own it */ - return ret ; + return WaitForSingleObject(mutex->sem, milliseconds); } BOOL LeaveNonRecursiveMutex(PNRMUTEX mutex) { - /* We don't own the mutex */ - mutex->thread_id = 0 ; - return - InterlockedDecrement(&mutex->owned) < 0 || - SetEvent(mutex->hevent) ; /* Other threads are waiting, wake one on them up */ + return ReleaseSemaphore(mutex->sem, 1, NULL); } /************************************************************/ @@ -181,8 +158,8 @@ void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock) { - if (lock->hevent != NULL) - DeleteNonRecursiveMutex(lock); + if (lock->sem != NULL) + DeleteNonRecursiveMutex(lock); } /* @@ -191,9 +168,40 @@ * and 0 if the lock was not acquired. This means a 0 is returned * if the lock has already been acquired by this thread! */ +RPyLockStatus +RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock, + RPY_TIMEOUT_T microseconds, int intr_flag) +{ + /* Fow now, intr_flag does nothing on Windows, and lock acquires are + * uninterruptible. */ + PyLockStatus success; + PY_TIMEOUT_T milliseconds; + + if (microseconds >= 0) { + milliseconds = microseconds / 1000; + if (microseconds % 1000 > 0) + ++milliseconds; + if ((DWORD) milliseconds != milliseconds) + Py_FatalError("Timeout too large for a DWORD, " + "please check PY_TIMEOUT_MAX"); + } + else + milliseconds = INFINITE; + + if (lock && EnterNonRecursiveMutex( + lock, (DWORD)milliseconds) == WAIT_OBJECT_0) { + success = PY_LOCK_ACQUIRED; + } + else { + success = PY_LOCK_FAILURE; + } + + return success; +} + int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag) { - return EnterNonRecursiveMutex(lock, (waitflag != 0 ? INFINITE : 0)) == WAIT_OBJECT_0; + return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0); } void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock) diff --git a/pypy/translator/c/src/thread_pthread.h b/pypy/translator/c/src/thread_pthread.h --- a/pypy/translator/c/src/thread_pthread.h +++ b/pypy/translator/c/src/thread_pthread.h @@ -7,6 +7,7 @@ */ #include <unistd.h> /* for the _POSIX_xxx and _POSIX_THREAD_xxx defines */ +#include <sys/time.h> /* for gettimeofday() */ #include <stdlib.h> #include <pthread.h> #include <signal.h> @@ -97,6 +98,8 @@ int RPyThreadLockInit(struct RPyOpaque_ThreadLock *lock); void RPyOpaqueDealloc_ThreadLock(struct RPyOpaque_ThreadLock *lock); int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag); +RPyLockStatus RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock, + RPY_TIMEOUT_T timeout, int intr_flag); void RPyThreadReleaseLock(struct RPyOpaque_ThreadLock *lock); long RPyThreadGetStackSize(void); long RPyThreadSetStackSize(long); @@ -238,6 +241,29 @@ #endif } +#ifdef GETTIMEOFDAY_NO_TZ +#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv) +#else +#define RPY_GETTIMEOFDAY(ptv) gettimeofday(ptv, (struct timezone *)NULL) +#endif + +#define RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts) \ +do { \ + struct timeval tv; \ + RPY_GETTIMEOFDAY(&tv); \ + tv.tv_usec += microseconds % 1000000; \ + tv.tv_sec += microseconds / 1000000; \ + tv.tv_sec += tv.tv_usec / 1000000; \ + tv.tv_usec %= 1000000; \ + ts.tv_sec = tv.tv_sec; \ + ts.tv_nsec = tv.tv_usec * 1000; \ +} while(0) + +int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag) +{ + return RPyThreadAcquireLockTimed(lock, waitflag ? -1 : 0, /*intr_flag=*/0); +} + /************************************************************/ #ifdef USE_SEMAPHORES /************************************************************/ @@ -283,26 +309,50 @@ return (status == -1) ? errno : status; } -int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag) +RPyLockStatus +RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock, + RPY_TIMEOUT_T microseconds, int intr_flag) { - int success; + RPyLockStatus success; sem_t *thelock = &lock->sem; int status, error = 0; + struct timespec ts; + if (microseconds > 0) + RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts); do { - if (waitflag) - status = rpythread_fix_status(sem_wait(thelock)); - else - status = rpythread_fix_status(sem_trywait(thelock)); - } while (status == EINTR); /* Retry if interrupted by a signal */ + if (microseconds > 0) + status = rpythread_fix_status(sem_timedwait(thelock, &ts)); + else if (microseconds == 0) + status = rpythread_fix_status(sem_trywait(thelock)); + else + status = rpythread_fix_status(sem_wait(thelock)); + /* Retry if interrupted by a signal, unless the caller wants to be + notified. */ + } while (!intr_flag && status == EINTR); - if (waitflag) { + /* Don't check the status if we're stopping because of an interrupt. */ + if (!(intr_flag && status == EINTR)) { + if (microseconds > 0) { + if (status != ETIMEDOUT) + CHECK_STATUS("sem_timedwait"); + } + else if (microseconds == 0) { + if (status != EAGAIN) + CHECK_STATUS("sem_trywait"); + } + else { CHECK_STATUS("sem_wait"); - } else if (status != EAGAIN) { - CHECK_STATUS("sem_trywait"); + } } - - success = (status == 0) ? 1 : 0; + + if (status == 0) { + success = RPY_LOCK_ACQUIRED; + } else if (intr_flag && status == EINTR) { + success = RPY_LOCK_INTR; + } else { + success = RPY_LOCK_FAILURE; + } return success; } @@ -394,32 +444,63 @@ } } -int RPyThreadAcquireLock(struct RPyOpaque_ThreadLock *lock, int waitflag) +RPyLockStatus +RPyThreadAcquireLockTimed(struct RPyOpaque_ThreadLock *lock, + RPY_TIMEOUT_T microseconds, int intr_flag) { - int success; + RPyLockStatus success; int status, error = 0; status = pthread_mutex_lock( &lock->mut ); CHECK_STATUS("pthread_mutex_lock[1]"); - success = lock->locked == 0; - if ( !success && waitflag ) { + if (lock->locked == 0) { + success = RPY_LOCK_ACQUIRED; + } else if (microseconds == 0) { + success = RPY_LOCK_FAILURE; + } else { + struct timespec ts; + if (microseconds > 0) + RPY_MICROSECONDS_TO_TIMESPEC(microseconds, ts); /* continue trying until we get the lock */ /* mut must be locked by me -- part of the condition * protocol */ - while ( lock->locked ) { - status = pthread_cond_wait(&lock->lock_released, - &lock->mut); + success = RPY_LOCK_FAILURE; + while (success == RPY_LOCK_FAILURE) { + if (microseconds > 0) { + status = pthread_cond_timedwait( + &lock->lock_released, + &lock->mut, &ts); + if (status == ETIMEDOUT) + break; + CHECK_STATUS("pthread_cond_timed_wait"); + } + else { + status = pthread_cond_wait( + &lock->lock_released, + &lock->mut); CHECK_STATUS("pthread_cond_wait"); + } + + if (intr_flag && status == 0 && lock->locked) { + /* We were woken up, but didn't get the lock. We probably received + * a signal. Return RPY_LOCK_INTR to allow the caller to handle + * it and retry. */ + success = RPY_LOCK_INTR; + break; + } else if (status == 0 && !lock->locked) { + success = RPY_LOCK_ACQUIRED; + } else { + success = RPY_LOCK_FAILURE; + } } - success = 1; } - if (success) lock->locked = 1; + if (success == RPY_LOCK_ACQUIRED) lock->locked = 1; status = pthread_mutex_unlock( &lock->mut ); CHECK_STATUS("pthread_mutex_unlock[1]"); - if (error) success = 0; + if (error) success = RPY_LOCK_FAILURE; return success; } _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit