Module Name:    src
Committed By:   pooka
Date:           Mon Jan 10 19:49:43 UTC 2011

Modified Files:
        src/lib/librumpclient: rumpclient.c
        src/lib/librumpuser: rumpuser_sp.c sp_common.c

Log Message:
A bunch of improvements:

* don't hold spc mutex while sending data
* use send() for the banner to avoid SIGPIPE in case a client
  connects and immediately goes away
* fix error path locking
* use kevent() instead of pollts() in the client.  Apparently that
  is the only sensible way for a library to support both multithreading
  and signal-reentrancy in a race-free manner.
  (can I catch all signals with one kevent instead of installing
  NSIG different ones??)
* mark client comm descriptor non-blocking so that clients have
  better signal-interruptibility (we now sleep in signal-accepting
  kevent() instead of signal-masked recvfrom())


To generate a diff of this commit:
cvs rdiff -u -r1.14 -r1.15 src/lib/librumpclient/rumpclient.c
cvs rdiff -u -r1.33 -r1.34 src/lib/librumpuser/rumpuser_sp.c
cvs rdiff -u -r1.22 -r1.23 src/lib/librumpuser/sp_common.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/lib/librumpclient/rumpclient.c
diff -u src/lib/librumpclient/rumpclient.c:1.14 src/lib/librumpclient/rumpclient.c:1.15
--- src/lib/librumpclient/rumpclient.c:1.14	Sun Jan  9 14:10:03 2011
+++ src/lib/librumpclient/rumpclient.c	Mon Jan 10 19:49:43 2011
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpclient.c,v 1.14 2011/01/09 14:10:03 pooka Exp $	*/
+/*      $NetBSD: rumpclient.c,v 1.15 2011/01/10 19:49:43 pooka Exp $	*/
 
 /*
  * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
@@ -33,6 +33,7 @@
 __RCSID("$NetBSD");
 
 #include <sys/param.h>
+#include <sys/event.h>
 #include <sys/mman.h>
 #include <sys/socket.h>
 
@@ -60,6 +61,7 @@
 int	(*host_socket)(int, int, int);
 int	(*host_close)(int);
 int	(*host_connect)(int, const struct sockaddr *, socklen_t);
+int	(*host_fcntl)(int, int, ...);
 int	(*host_poll)(struct pollfd *, nfds_t, int);
 int	(*host_pollts)(struct pollfd *, nfds_t, const struct timespec *,
 		      const sigset_t *);
@@ -74,28 +76,14 @@
 	.spc_fd = -1,
 };
 
-/*
- * This version of waitresp is optimized for single-threaded clients
- * and is required by signal-safe clientside rump syscalls.
- */
-
-static void
-releasercvlock(struct spclient *spc)
-{
-
-	pthread_mutex_lock(&spc->spc_mtx);
-	if (spc->spc_istatus == SPCSTATUS_WANTED)
-		kickall(spc);
-	spc->spc_istatus = SPCSTATUS_FREE;
-}
-
+static int kq;
 static sigset_t fullset;
+
 static int
 waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask)
 {
-	struct pollfd pfd;
-	int rv = 0;
 
+	pthread_mutex_lock(&spc->spc_mtx);
 	sendunlockl(spc);
 
 	rw->rw_error = 0;
@@ -103,32 +91,41 @@
 	    && spc->spc_state != SPCSTATE_DYING){
 		/* are we free to receive? */
 		if (spc->spc_istatus == SPCSTATUS_FREE) {
+			struct kevent kev[8];
+			int gotresp, dosig, rv, i;
+
 			spc->spc_istatus = SPCSTATUS_BUSY;
 			pthread_mutex_unlock(&spc->spc_mtx);
 
-			pfd.fd = spc->spc_fd;
-			pfd.events = POLLIN;
-
-			switch (readframe(spc)) {
-			case 0:
-				releasercvlock(spc);
-				pthread_mutex_unlock(&spc->spc_mtx);
-				host_pollts(&pfd, 1, NULL, mask);
-				pthread_mutex_lock(&spc->spc_mtx);
-				continue;
-			case -1:
-				releasercvlock(spc);
-				rv = errno;
-				spc->spc_state = SPCSTATE_DYING;
-				continue;
-			default:
-				break;
-			}
+			dosig = 0;
+			for (gotresp = 0; !gotresp; ) {
+				switch (readframe(spc)) {
+				case 0:
+					rv = kevent(kq, NULL, 0,
+					    kev, __arraycount(kev), NULL);
+					assert(rv > 0);
+					for (i = 0; i < rv; i++) {
+						if (kev[i].filter
+						    == EVFILT_SIGNAL)
+							dosig++;
+					}
+					if (dosig)
+						goto cleanup;
+
+					continue;
+				case -1:
+					spc->spc_state = SPCSTATE_DYING;
+					goto cleanup;
+				default:
+					break;
+				}
 
-			switch (spc->spc_hdr.rsp_class) {
+				switch (spc->spc_hdr.rsp_class) {
 				case RUMPSP_RESP:
 				case RUMPSP_ERROR:
 					kickwaiter(spc);
+					gotresp = spc->spc_hdr.rsp_reqno ==
+					    rw->rw_reqno;
 					break;
 				case RUMPSP_REQ:
 					handlereq(spc);
@@ -136,9 +133,22 @@
 				default:
 					/* panic */
 					break;
+				}
 			}
 
-			releasercvlock(spc);
+ cleanup:
+			pthread_mutex_lock(&spc->spc_mtx);
+			if (spc->spc_istatus == SPCSTATUS_WANTED)
+				kickall(spc);
+			spc->spc_istatus = SPCSTATUS_FREE;
+
+			/* take one for the team */
+			if (dosig) {
+				pthread_mutex_unlock(&spc->spc_mtx);
+				pthread_sigmask(SIG_SETMASK, mask, NULL);
+				pthread_sigmask(SIG_SETMASK, &fullset, NULL);
+				pthread_mutex_lock(&spc->spc_mtx);
+			}
 		} else {
 			spc->spc_istatus = SPCSTATUS_WANTED;
 			pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
@@ -148,8 +158,6 @@
 	pthread_mutex_unlock(&spc->spc_mtx);
 	pthread_cond_destroy(&rw->rw_cv);
 
-	if (rv)
-		return rv;
 	if (spc->spc_state == SPCSTATE_DYING)
 		return ENOTCONN;
 	return rw->rw_error;
@@ -385,8 +393,9 @@
 static int
 doconnect(void)
 {
+	struct kevent kev[NSIG+1];
 	char banner[MAXBANNER];
-	int s, error;
+	int s, error, flags, i;
 	ssize_t n;
 
 	s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0);
@@ -421,8 +430,34 @@
 	}
 	banner[n] = '\0';
 
+	flags = host_fcntl(s, F_GETFL, 0);
+	if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) {
+		fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n");
+		errno = EINVAL;
+		return -1;
+	}
+
 	/* parse the banner some day */
 
+	/* setup kqueue, we want all signals and the fd */
+	if ((kq = kqueue()) == -1) {
+		error = errno;
+		fprintf(stderr, "rump_sp: cannot setup kqueue");
+		errno = error;
+		return -1;
+	}
+
+	for (i = 0; i < NSIG; i++) {
+		EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0);
+	}
+	EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0);
+	if (kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) {
+		error = errno;
+		fprintf(stderr, "rump_sp: kevent() failed");
+		errno = error;
+		return -1;
+	}
+
 	clispc.spc_fd = s;
 	TAILQ_INIT(&clispc.spc_respwait);
 	pthread_mutex_init(&clispc.spc_mtx, NULL);
@@ -455,6 +490,7 @@
 	FINDSYM2(socket,__socket30);
 	FINDSYM(close);
 	FINDSYM(connect);
+	FINDSYM(fcntl);
 	FINDSYM(poll);
 	FINDSYM(pollts);
 	FINDSYM(read);
@@ -522,6 +558,8 @@
 	int error;
 
 	host_close(clispc.spc_fd);
+	host_close(kq);
+	kq = -1;
 	memset(&clispc, 0, sizeof(clispc));
 	clispc.spc_fd = -1;
 

Index: src/lib/librumpuser/rumpuser_sp.c
diff -u src/lib/librumpuser/rumpuser_sp.c:1.33 src/lib/librumpuser/rumpuser_sp.c:1.34
--- src/lib/librumpuser/rumpuser_sp.c:1.33	Mon Jan 10 11:57:53 2011
+++ src/lib/librumpuser/rumpuser_sp.c	Mon Jan 10 19:49:43 2011
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $	*/
+/*      $NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $	*/
 
 /*
  * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
@@ -35,7 +35,7 @@
  */
 
 #include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $");
 
 #include <sys/types.h>
 #include <sys/atomic.h>
@@ -104,20 +104,23 @@
 static int
 waitresp(struct spclient *spc, struct respwait *rw)
 {
+	int spcstate;
 	int rv = 0;
 
+	pthread_mutex_lock(&spc->spc_mtx);
 	sendunlockl(spc);
 	while (!rw->rw_done && spc->spc_state != SPCSTATE_DYING) {
 		pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
 	}
 	TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
+	spcstate = spc->spc_state;
 	pthread_mutex_unlock(&spc->spc_mtx);
 
 	pthread_cond_destroy(&rw->rw_cv);
 
 	if (rv)
 		return rv;
-	if (spc->spc_state == SPCSTATE_DYING)
+	if (spcstate == SPCSTATE_DYING)
 		return ENOTCONN;
 	return rw->rw_error;
 }
@@ -511,7 +514,8 @@
 	}
 
 	/* write out a banner for the client */
-	if (write(newfd, banner, strlen(banner)) != (ssize_t)strlen(banner)) {
+	if (send(newfd, banner, strlen(banner), MSG_NOSIGNAL)
+	    != (ssize_t)strlen(banner)) {
 		close(newfd);
 		return 0;
 	}

Index: src/lib/librumpuser/sp_common.c
diff -u src/lib/librumpuser/sp_common.c:1.22 src/lib/librumpuser/sp_common.c:1.23
--- src/lib/librumpuser/sp_common.c:1.22	Mon Jan 10 11:57:53 2011
+++ src/lib/librumpuser/sp_common.c	Mon Jan 10 19:49:43 2011
@@ -1,4 +1,4 @@
-/*      $NetBSD: sp_common.c,v 1.22 2011/01/10 11:57:53 pooka Exp $	*/
+/*      $NetBSD: sp_common.c,v 1.23 2011/01/10 19:49:43 pooka Exp $	*/
 
 /*
  * Copyright (c) 2010, 2011 Antti Kantee.  All Rights Reserved.
@@ -212,7 +212,6 @@
 sendlockl(struct spclient *spc)
 {
 
-	/* assert(pthread_mutex_owned) */
 	while (spc->spc_ostatus != SPCSTATUS_FREE) {
 		spc->spc_ostatus = SPCSTATUS_WANTED;
 		pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
@@ -233,7 +232,6 @@
 sendunlockl(struct spclient *spc)
 {
 
-	/* assert(pthread_mutex_owned) */
 	if (spc->spc_ostatus == SPCSTATUS_WANTED)
 		pthread_cond_broadcast(&spc->spc_cv);
 	spc->spc_ostatus = SPCSTATUS_FREE;
@@ -298,12 +296,14 @@
 	TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries);
 
 	sendlockl(spc);
+	pthread_mutex_unlock(&spc->spc_mtx);
 }
 
 static void
 unputwait(struct spclient *spc, struct respwait *rw)
 {
 
+	pthread_mutex_lock(&spc->spc_mtx);
 	sendunlockl(spc);
 
 	TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
@@ -325,6 +325,7 @@
 	if (rw == NULL) {
 		DPRINTF(("no waiter found, invalid reqno %" PRIu64 "?\n",
 		    spc->spc_hdr.rsp_reqno));
+		pthread_mutex_unlock(&spc->spc_mtx);
 		spcfreebuf(spc);
 		return;
 	}

Reply via email to