Module Name:    src
Committed By:   pooka
Date:           Fri May  3 00:23:49 UTC 2013

Modified Files:
        src/lib/librumpuser: rumpuser_pth.c

Log Message:
Support proper rw_downgrade() semantics.


To generate a diff of this commit:
cvs rdiff -u -r1.26 -r1.27 src/lib/librumpuser/rumpuser_pth.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/lib/librumpuser/rumpuser_pth.c
diff -u src/lib/librumpuser/rumpuser_pth.c:1.26 src/lib/librumpuser/rumpuser_pth.c:1.27
--- src/lib/librumpuser/rumpuser_pth.c:1.26	Thu May  2 22:07:57 2013
+++ src/lib/librumpuser/rumpuser_pth.c	Fri May  3 00:23:49 2013
@@ -1,4 +1,4 @@
-/*	$NetBSD: rumpuser_pth.c,v 1.26 2013/05/02 22:07:57 pooka Exp $	*/
+/*	$NetBSD: rumpuser_pth.c,v 1.27 2013/05/03 00:23:49 pooka Exp $	*/
 
 /*
  * Copyright (c) 2007-2010 Antti Kantee.  All Rights Reserved.
@@ -28,7 +28,7 @@
 #include "rumpuser_port.h"
 
 #if !defined(lint)
-__RCSID("$NetBSD: rumpuser_pth.c,v 1.26 2013/05/02 22:07:57 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_pth.c,v 1.27 2013/05/03 00:23:49 pooka Exp $");
 #endif /* !lint */
 
 #include <sys/queue.h>
@@ -223,7 +223,12 @@ rumpuser_mutex_owner(struct rumpuser_mtx
 }
 
 /*
- * rwlocks
+ * rwlocks.  these are mostly simple, except that NetBSD wants to
+ * support something called downgrade, which means we need to swap
+ * our exclusive lock for a shared lock.  to accommodate this,
+ * we need to check *after* acquiring a lock in case someone was
+ * downgrading it.  if so, we couldn't actually have it and maybe
+ * need to retry later.
  */
 
 struct rumpuser_rw {
@@ -231,38 +236,77 @@ struct rumpuser_rw {
 	pthread_spinlock_t spin;
 	int readers;
 	struct lwp *writer;
+	int downgrade; /* someone is downgrading (hopefully lock holder ;) */
 };
 
-#define RURW_AMWRITER(rw) (rw->writer == rumpuser_curlwp()		\
-				&& rw->readers == -1)
-#define RURW_HASREAD(rw)  (rw->readers > 0)
-
-#define RURW_SETWRITE(rw)						\
-do {									\
-	assert(rw->readers == 0);					\
-	rw->writer = rumpuser_curlwp();					\
-	rw->readers = -1;						\
-} while (/*CONSTCOND*/0)
-#define RURW_CLRWRITE(rw)						\
-do {									\
-	assert(RURW_AMWRITER(rw));					\
-	rw->readers = 0;						\
-	rw->writer = NULL;						\
-} while (/*CONSTCOND*/0)
-#define RURW_INCREAD(rw)						\
-do {									\
-	pthread_spin_lock(&rw->spin);					\
-	assert(rw->readers >= 0);					\
-	++(rw)->readers;						\
-	pthread_spin_unlock(&rw->spin);					\
-} while (/*CONSTCOND*/0)
-#define RURW_DECREAD(rw)						\
-do {									\
-	pthread_spin_lock(&rw->spin);					\
-	assert(rw->readers > 0);					\
-	--(rw)->readers;						\
-	pthread_spin_unlock(&rw->spin);					\
-} while (/*CONSTCOND*/0)
+static int
+rw_amwriter(struct rumpuser_rw *rw)
+{
+
+	return rw->writer == rumpuser_curlwp() && rw->readers == -1;
+}
+
+static int
+rw_nreaders(struct rumpuser_rw *rw)
+{
+
+	return rw->readers > 0 ? rw->readers : 0;
+}
+
+static int
+rw_setwriter(struct rumpuser_rw *rw, int retry)
+{
+
+	/*
+	 * Don't need the spinlock here, we already have an
+	 * exclusive lock and "downgrade" is stable until complete.
+	 */
+	if (rw->downgrade) {
+		pthread_rwlock_unlock(&rw->pthrw);
+		if (retry) {
+			struct timespec ts;
+
+			/* portable yield, essentially */
+			ts.tv_sec = 0;
+			ts.tv_nsec = 1;
+			KLOCK_WRAP(nanosleep(&ts, NULL));
+		}
+		return EBUSY;
+	}
+	assert(rw->readers == 0);
+	rw->writer = rumpuser_curlwp();
+	rw->readers = -1;
+	return 0;
+}
+
+static void
+rw_clearwriter(struct rumpuser_rw *rw)
+{
+
+	assert(rw_amwriter(rw));
+	rw->readers = 0;
+	rw->writer = NULL;
+}
+
+static void
+rw_readup(struct rumpuser_rw *rw)
+{
+
+	pthread_spin_lock(&rw->spin);
+	assert(rw->readers >= 0);
+	++rw->readers;
+	pthread_spin_unlock(&rw->spin);
+}
+
+static void
+rw_readdown(struct rumpuser_rw *rw)
+{
+
+	pthread_spin_lock(&rw->spin);
+	assert(rw->readers > 0);
+	--rw->readers;
+	pthread_spin_unlock(&rw->spin);
+}
 
 void
 rumpuser_rw_init(struct rumpuser_rw **rw)
@@ -281,16 +325,17 @@ rumpuser_rw_enter(struct rumpuser_rw *rw
 
 	switch (lk) {
 	case RUMPUSER_RW_WRITER:
-		if (pthread_rwlock_trywrlock(&rw->pthrw) != 0)
-			KLOCK_WRAP(NOFAIL_ERRNO(
-			    pthread_rwlock_wrlock(&rw->pthrw)));
-		RURW_SETWRITE(rw);
+		do {
+			if (pthread_rwlock_trywrlock(&rw->pthrw) != 0)
+				KLOCK_WRAP(NOFAIL_ERRNO(
+				    pthread_rwlock_wrlock(&rw->pthrw)));
+		} while (rw_setwriter(rw, 1) != 0);
 		break;
 	case RUMPUSER_RW_READER:
 		if (pthread_rwlock_tryrdlock(&rw->pthrw) != 0)
 			KLOCK_WRAP(NOFAIL_ERRNO(
 			    pthread_rwlock_rdlock(&rw->pthrw)));
-		RURW_INCREAD(rw);
+		rw_readup(rw);
 		break;
 	}
 }
@@ -304,12 +349,12 @@ rumpuser_rw_tryenter(struct rumpuser_rw 
 	case RUMPUSER_RW_WRITER:
 		rv = pthread_rwlock_trywrlock(&rw->pthrw);
 		if (rv == 0)
-			RURW_SETWRITE(rw);
+			rv = rw_setwriter(rw, 0);
 		break;
 	case RUMPUSER_RW_READER:
 		rv = pthread_rwlock_tryrdlock(&rw->pthrw);
 		if (rv == 0)
-			RURW_INCREAD(rw);
+			rw_readup(rw);
 		break;
 	default:
 		rv = EINVAL;
@@ -323,33 +368,45 @@ int
 rumpuser_rw_tryupgrade(struct rumpuser_rw *rw)
 {
 
-	/* not supported by pthreads */
+	/*
+	 * Not supported by pthreads.  Since the caller needs to
+	 * back off anyway to avoid deadlock, always failing
+	 * is correct.
+	 */
 	ET(EBUSY);
 }
 
+/*
+ * convert from exclusive to shared lock without allowing anyone to
+ * obtain an exclusive lock in between.  actually, might allow
+ * someone to obtain the lock, we just don't allow that thread to
+ * return from the hypercall with it.
+ */
 void
 rumpuser_rw_downgrade(struct rumpuser_rw *rw)
 {
 
+	assert(rw->downgrade == 0);
+	rw->downgrade = 1;
+	rumpuser_rw_exit(rw);
 	/*
-	 * I guess this is not strictly speaking correct,
-	 * but the option is to provide a complete implementation
-	 * of rwlocks here, or at least wrap acquiry in 1) lock
-	 * 2) check if someone is downgrading. if not, we're done
-	 * 3) unlock 4) yield 5) goto 1.
+	 * though the competition can't get out of the hypervisor, it
+	 * might have rescheduled itself after we released the lock.
+	 * so need a wrap here.
 	 */
-	rumpuser_rw_exit(rw);
-	rumpuser_rw_enter(rw, RUMPUSER_RW_READER);
+	KLOCK_WRAP(NOFAIL_ERRNO(pthread_rwlock_rdlock(&rw->pthrw)));
+	rw->downgrade = 0;
+	rw_readup(rw);
 }
 
 void
 rumpuser_rw_exit(struct rumpuser_rw *rw)
 {
 
-	if (RURW_HASREAD(rw))
-		RURW_DECREAD(rw);
+	if (rw_nreaders(rw))
+		rw_readdown(rw);
 	else
-		RURW_CLRWRITE(rw);
+		rw_clearwriter(rw);
 	NOFAIL_ERRNO(pthread_rwlock_unlock(&rw->pthrw));
 }
 
@@ -368,10 +425,10 @@ rumpuser_rw_held(struct rumpuser_rw *rw,
 
 	switch (lk) {
 	case RUMPUSER_RW_WRITER:
-		*rv = RURW_AMWRITER(rw);
+		*rv = rw_amwriter(rw);
 		break;
 	case RUMPUSER_RW_READER:
-		*rv = RURW_HASREAD(rw);
+		*rv = rw_nreaders(rw);
 		break;
 	}
 }

Reply via email to