Module Name: src Committed By: pooka Date: Fri May 3 00:23:49 UTC 2013
Modified Files: src/lib/librumpuser: rumpuser_pth.c Log Message: Support proper rw_downgrade() semantics. To generate a diff of this commit: cvs rdiff -u -r1.26 -r1.27 src/lib/librumpuser/rumpuser_pth.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/lib/librumpuser/rumpuser_pth.c diff -u src/lib/librumpuser/rumpuser_pth.c:1.26 src/lib/librumpuser/rumpuser_pth.c:1.27 --- src/lib/librumpuser/rumpuser_pth.c:1.26 Thu May 2 22:07:57 2013 +++ src/lib/librumpuser/rumpuser_pth.c Fri May 3 00:23:49 2013 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpuser_pth.c,v 1.26 2013/05/02 22:07:57 pooka Exp $ */ +/* $NetBSD: rumpuser_pth.c,v 1.27 2013/05/03 00:23:49 pooka Exp $ */ /* * Copyright (c) 2007-2010 Antti Kantee. All Rights Reserved. @@ -28,7 +28,7 @@ #include "rumpuser_port.h" #if !defined(lint) -__RCSID("$NetBSD: rumpuser_pth.c,v 1.26 2013/05/02 22:07:57 pooka Exp $"); +__RCSID("$NetBSD: rumpuser_pth.c,v 1.27 2013/05/03 00:23:49 pooka Exp $"); #endif /* !lint */ #include <sys/queue.h> @@ -223,7 +223,12 @@ rumpuser_mutex_owner(struct rumpuser_mtx } /* - * rwlocks + * rwlocks. these are mostly simple, except that NetBSD wants to + * support something called downgrade, which means we need to swap + * our exclusive lock for a shared lock. to accommodate this, + * we need to check *after* acquiring a lock in case someone was + * downgrading it. if so, we couldn't actually have it and maybe + * need to retry later. */ struct rumpuser_rw { @@ -231,38 +236,77 @@ struct rumpuser_rw { pthread_spinlock_t spin; int readers; struct lwp *writer; + int downgrade; /* someone is downgrading (hopefully lock holder ;) */ }; -#define RURW_AMWRITER(rw) (rw->writer == rumpuser_curlwp() \ - && rw->readers == -1) -#define RURW_HASREAD(rw) (rw->readers > 0) - -#define RURW_SETWRITE(rw) \ -do { \ - assert(rw->readers == 0); \ - rw->writer = rumpuser_curlwp(); \ - rw->readers = -1; \ -} while (/*CONSTCOND*/0) -#define RURW_CLRWRITE(rw) \ -do { \ - assert(RURW_AMWRITER(rw)); \ - rw->readers = 0; \ - rw->writer = NULL; \ -} while (/*CONSTCOND*/0) -#define RURW_INCREAD(rw) \ -do { \ - pthread_spin_lock(&rw->spin); \ - assert(rw->readers >= 0); \ - ++(rw)->readers; \ - pthread_spin_unlock(&rw->spin); \ -} while (/*CONSTCOND*/0) -#define RURW_DECREAD(rw) \ -do { \ - pthread_spin_lock(&rw->spin); \ - assert(rw->readers > 0); \ - --(rw)->readers; \ - pthread_spin_unlock(&rw->spin); \ -} while (/*CONSTCOND*/0) +static int +rw_amwriter(struct rumpuser_rw *rw) +{ + + return rw->writer == rumpuser_curlwp() && rw->readers == -1; +} + +static int +rw_nreaders(struct rumpuser_rw *rw) +{ + + return rw->readers > 0 ? rw->readers : 0; +} + +static int +rw_setwriter(struct rumpuser_rw *rw, int retry) +{ + + /* + * Don't need the spinlock here, we already have an + * exclusive lock and "downgrade" is stable until complete. + */ + if (rw->downgrade) { + pthread_rwlock_unlock(&rw->pthrw); + if (retry) { + struct timespec ts; + + /* portable yield, essentially */ + ts.tv_sec = 0; + ts.tv_nsec = 1; + KLOCK_WRAP(nanosleep(&ts, NULL)); + } + return EBUSY; + } + assert(rw->readers == 0); + rw->writer = rumpuser_curlwp(); + rw->readers = -1; + return 0; +} + +static void +rw_clearwriter(struct rumpuser_rw *rw) +{ + + assert(rw_amwriter(rw)); + rw->readers = 0; + rw->writer = NULL; +} + +static void +rw_readup(struct rumpuser_rw *rw) +{ + + pthread_spin_lock(&rw->spin); + assert(rw->readers >= 0); + ++rw->readers; + pthread_spin_unlock(&rw->spin); +} + +static void +rw_readdown(struct rumpuser_rw *rw) +{ + + pthread_spin_lock(&rw->spin); + assert(rw->readers > 0); + --rw->readers; + pthread_spin_unlock(&rw->spin); +} void rumpuser_rw_init(struct rumpuser_rw **rw) @@ -281,16 +325,17 @@ rumpuser_rw_enter(struct rumpuser_rw *rw switch (lk) { case RUMPUSER_RW_WRITER: - if (pthread_rwlock_trywrlock(&rw->pthrw) != 0) - KLOCK_WRAP(NOFAIL_ERRNO( - pthread_rwlock_wrlock(&rw->pthrw))); - RURW_SETWRITE(rw); + do { + if (pthread_rwlock_trywrlock(&rw->pthrw) != 0) + KLOCK_WRAP(NOFAIL_ERRNO( + pthread_rwlock_wrlock(&rw->pthrw))); + } while (rw_setwriter(rw, 1) != 0); break; case RUMPUSER_RW_READER: if (pthread_rwlock_tryrdlock(&rw->pthrw) != 0) KLOCK_WRAP(NOFAIL_ERRNO( pthread_rwlock_rdlock(&rw->pthrw))); - RURW_INCREAD(rw); + rw_readup(rw); break; } } @@ -304,12 +349,12 @@ rumpuser_rw_tryenter(struct rumpuser_rw case RUMPUSER_RW_WRITER: rv = pthread_rwlock_trywrlock(&rw->pthrw); if (rv == 0) - RURW_SETWRITE(rw); + rv = rw_setwriter(rw, 0); break; case RUMPUSER_RW_READER: rv = pthread_rwlock_tryrdlock(&rw->pthrw); if (rv == 0) - RURW_INCREAD(rw); + rw_readup(rw); break; default: rv = EINVAL; @@ -323,33 +368,45 @@ int rumpuser_rw_tryupgrade(struct rumpuser_rw *rw) { - /* not supported by pthreads */ + /* + * Not supported by pthreads. Since the caller needs to + * back off anyway to avoid deadlock, always failing + * is correct. + */ ET(EBUSY); } +/* + * convert from exclusive to shared lock without allowing anyone to + * obtain an exclusive lock in between. actually, might allow + * someone to obtain the lock, we just don't allow that thread to + * return from the hypercall with it. + */ void rumpuser_rw_downgrade(struct rumpuser_rw *rw) { + assert(rw->downgrade == 0); + rw->downgrade = 1; + rumpuser_rw_exit(rw); /* - * I guess this is not strictly speaking correct, - * but the option is to provide a complete implementation - * of rwlocks here, or at least wrap acquiry in 1) lock - * 2) check if someone is downgrading. if not, we're done - * 3) unlock 4) yield 5) goto 1. + * though the competition can't get out of the hypervisor, it + * might have rescheduled itself after we released the lock. + * so need a wrap here. */ - rumpuser_rw_exit(rw); - rumpuser_rw_enter(rw, RUMPUSER_RW_READER); + KLOCK_WRAP(NOFAIL_ERRNO(pthread_rwlock_rdlock(&rw->pthrw))); + rw->downgrade = 0; + rw_readup(rw); } void rumpuser_rw_exit(struct rumpuser_rw *rw) { - if (RURW_HASREAD(rw)) - RURW_DECREAD(rw); + if (rw_nreaders(rw)) + rw_readdown(rw); else - RURW_CLRWRITE(rw); + rw_clearwriter(rw); NOFAIL_ERRNO(pthread_rwlock_unlock(&rw->pthrw)); } @@ -368,10 +425,10 @@ rumpuser_rw_held(struct rumpuser_rw *rw, switch (lk) { case RUMPUSER_RW_WRITER: - *rv = RURW_AMWRITER(rw); + *rv = rw_amwriter(rw); break; case RUMPUSER_RW_READER: - *rv = RURW_HASREAD(rw); + *rv = rw_nreaders(rw); break; } }