Module Name: src Committed By: pooka Date: Mon Jan 10 19:49:43 UTC 2011
Modified Files: src/lib/librumpclient: rumpclient.c src/lib/librumpuser: rumpuser_sp.c sp_common.c Log Message: A bunch of improvements: * don't hold spc mutex while sending data * use send() for the banner to avoid SIGPIPE in case a client connects and immediately goes away * fix error path locking * use kevent() instead of pollts() in the client. Apparently that is the only sensible way for a library to support both multithreading and signal-reentrancy in a race-free manner. (can I catch all signals with one kevent instead of installing NSIG different ones??) * mark client comm descriptor non-blocking so that clients have better signal-interruptibility (we now sleep in signal-accepting kevent() instead of signal-masked recvfrom()) To generate a diff of this commit: cvs rdiff -u -r1.14 -r1.15 src/lib/librumpclient/rumpclient.c cvs rdiff -u -r1.33 -r1.34 src/lib/librumpuser/rumpuser_sp.c cvs rdiff -u -r1.22 -r1.23 src/lib/librumpuser/sp_common.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/lib/librumpclient/rumpclient.c diff -u src/lib/librumpclient/rumpclient.c:1.14 src/lib/librumpclient/rumpclient.c:1.15 --- src/lib/librumpclient/rumpclient.c:1.14 Sun Jan 9 14:10:03 2011 +++ src/lib/librumpclient/rumpclient.c Mon Jan 10 19:49:43 2011 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpclient.c,v 1.14 2011/01/09 14:10:03 pooka Exp $ */ +/* $NetBSD: rumpclient.c,v 1.15 2011/01/10 19:49:43 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -33,6 +33,7 @@ __RCSID("$NetBSD"); #include <sys/param.h> +#include <sys/event.h> #include <sys/mman.h> #include <sys/socket.h> @@ -60,6 +61,7 @@ int (*host_socket)(int, int, int); int (*host_close)(int); int (*host_connect)(int, const struct sockaddr *, socklen_t); +int (*host_fcntl)(int, int, ...); int (*host_poll)(struct pollfd *, nfds_t, int); int (*host_pollts)(struct pollfd *, nfds_t, const struct timespec *, const sigset_t *); @@ -74,28 +76,14 @@ .spc_fd = -1, }; -/* - * This version of waitresp is optimized for single-threaded clients - * and is required by signal-safe clientside rump syscalls. - */ - -static void -releasercvlock(struct spclient *spc) -{ - - pthread_mutex_lock(&spc->spc_mtx); - if (spc->spc_istatus == SPCSTATUS_WANTED) - kickall(spc); - spc->spc_istatus = SPCSTATUS_FREE; -} - +static int kq; static sigset_t fullset; + static int waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask) { - struct pollfd pfd; - int rv = 0; + pthread_mutex_lock(&spc->spc_mtx); sendunlockl(spc); rw->rw_error = 0; @@ -103,32 +91,41 @@ && spc->spc_state != SPCSTATE_DYING){ /* are we free to receive? */ if (spc->spc_istatus == SPCSTATUS_FREE) { + struct kevent kev[8]; + int gotresp, dosig, rv, i; + spc->spc_istatus = SPCSTATUS_BUSY; pthread_mutex_unlock(&spc->spc_mtx); - pfd.fd = spc->spc_fd; - pfd.events = POLLIN; - - switch (readframe(spc)) { - case 0: - releasercvlock(spc); - pthread_mutex_unlock(&spc->spc_mtx); - host_pollts(&pfd, 1, NULL, mask); - pthread_mutex_lock(&spc->spc_mtx); - continue; - case -1: - releasercvlock(spc); - rv = errno; - spc->spc_state = SPCSTATE_DYING; - continue; - default: - break; - } + dosig = 0; + for (gotresp = 0; !gotresp; ) { + switch (readframe(spc)) { + case 0: + rv = kevent(kq, NULL, 0, + kev, __arraycount(kev), NULL); + assert(rv > 0); + for (i = 0; i < rv; i++) { + if (kev[i].filter + == EVFILT_SIGNAL) + dosig++; + } + if (dosig) + goto cleanup; + + continue; + case -1: + spc->spc_state = SPCSTATE_DYING; + goto cleanup; + default: + break; + } - switch (spc->spc_hdr.rsp_class) { + switch (spc->spc_hdr.rsp_class) { case RUMPSP_RESP: case RUMPSP_ERROR: kickwaiter(spc); + gotresp = spc->spc_hdr.rsp_reqno == + rw->rw_reqno; break; case RUMPSP_REQ: handlereq(spc); @@ -136,9 +133,22 @@ default: /* panic */ break; + } } - releasercvlock(spc); + cleanup: + pthread_mutex_lock(&spc->spc_mtx); + if (spc->spc_istatus == SPCSTATUS_WANTED) + kickall(spc); + spc->spc_istatus = SPCSTATUS_FREE; + + /* take one for the team */ + if (dosig) { + pthread_mutex_unlock(&spc->spc_mtx); + pthread_sigmask(SIG_SETMASK, mask, NULL); + pthread_sigmask(SIG_SETMASK, &fullset, NULL); + pthread_mutex_lock(&spc->spc_mtx); + } } else { spc->spc_istatus = SPCSTATUS_WANTED; pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx); @@ -148,8 +158,6 @@ pthread_mutex_unlock(&spc->spc_mtx); pthread_cond_destroy(&rw->rw_cv); - if (rv) - return rv; if (spc->spc_state == SPCSTATE_DYING) return ENOTCONN; return rw->rw_error; @@ -385,8 +393,9 @@ static int doconnect(void) { + struct kevent kev[NSIG+1]; char banner[MAXBANNER]; - int s, error; + int s, error, flags, i; ssize_t n; s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0); @@ -421,8 +430,34 @@ } banner[n] = '\0'; + flags = host_fcntl(s, F_GETFL, 0); + if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { + fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n"); + errno = EINVAL; + return -1; + } + /* parse the banner some day */ + /* setup kqueue, we want all signals and the fd */ + if ((kq = kqueue()) == -1) { + error = errno; + fprintf(stderr, "rump_sp: cannot setup kqueue"); + errno = error; + return -1; + } + + for (i = 0; i < NSIG; i++) { + EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0); + } + EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0); + if (kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) { + error = errno; + fprintf(stderr, "rump_sp: kevent() failed"); + errno = error; + return -1; + } + clispc.spc_fd = s; TAILQ_INIT(&clispc.spc_respwait); pthread_mutex_init(&clispc.spc_mtx, NULL); @@ -455,6 +490,7 @@ FINDSYM2(socket,__socket30); FINDSYM(close); FINDSYM(connect); + FINDSYM(fcntl); FINDSYM(poll); FINDSYM(pollts); FINDSYM(read); @@ -522,6 +558,8 @@ int error; host_close(clispc.spc_fd); + host_close(kq); + kq = -1; memset(&clispc, 0, sizeof(clispc)); clispc.spc_fd = -1; Index: src/lib/librumpuser/rumpuser_sp.c diff -u src/lib/librumpuser/rumpuser_sp.c:1.33 src/lib/librumpuser/rumpuser_sp.c:1.34 --- src/lib/librumpuser/rumpuser_sp.c:1.33 Mon Jan 10 11:57:53 2011 +++ src/lib/librumpuser/rumpuser_sp.c Mon Jan 10 19:49:43 2011 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $ */ +/* $NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -35,7 +35,7 @@ */ #include <sys/cdefs.h> -__RCSID("$NetBSD: rumpuser_sp.c,v 1.33 2011/01/10 11:57:53 pooka Exp $"); +__RCSID("$NetBSD: rumpuser_sp.c,v 1.34 2011/01/10 19:49:43 pooka Exp $"); #include <sys/types.h> #include <sys/atomic.h> @@ -104,20 +104,23 @@ static int waitresp(struct spclient *spc, struct respwait *rw) { + int spcstate; int rv = 0; + pthread_mutex_lock(&spc->spc_mtx); sendunlockl(spc); while (!rw->rw_done && spc->spc_state != SPCSTATE_DYING) { pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx); } TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); + spcstate = spc->spc_state; pthread_mutex_unlock(&spc->spc_mtx); pthread_cond_destroy(&rw->rw_cv); if (rv) return rv; - if (spc->spc_state == SPCSTATE_DYING) + if (spcstate == SPCSTATE_DYING) return ENOTCONN; return rw->rw_error; } @@ -511,7 +514,8 @@ } /* write out a banner for the client */ - if (write(newfd, banner, strlen(banner)) != (ssize_t)strlen(banner)) { + if (send(newfd, banner, strlen(banner), MSG_NOSIGNAL) + != (ssize_t)strlen(banner)) { close(newfd); return 0; } Index: src/lib/librumpuser/sp_common.c diff -u src/lib/librumpuser/sp_common.c:1.22 src/lib/librumpuser/sp_common.c:1.23 --- src/lib/librumpuser/sp_common.c:1.22 Mon Jan 10 11:57:53 2011 +++ src/lib/librumpuser/sp_common.c Mon Jan 10 19:49:43 2011 @@ -1,4 +1,4 @@ -/* $NetBSD: sp_common.c,v 1.22 2011/01/10 11:57:53 pooka Exp $ */ +/* $NetBSD: sp_common.c,v 1.23 2011/01/10 19:49:43 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -212,7 +212,6 @@ sendlockl(struct spclient *spc) { - /* assert(pthread_mutex_owned) */ while (spc->spc_ostatus != SPCSTATUS_FREE) { spc->spc_ostatus = SPCSTATUS_WANTED; pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx); @@ -233,7 +232,6 @@ sendunlockl(struct spclient *spc) { - /* assert(pthread_mutex_owned) */ if (spc->spc_ostatus == SPCSTATUS_WANTED) pthread_cond_broadcast(&spc->spc_cv); spc->spc_ostatus = SPCSTATUS_FREE; @@ -298,12 +296,14 @@ TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries); sendlockl(spc); + pthread_mutex_unlock(&spc->spc_mtx); } static void unputwait(struct spclient *spc, struct respwait *rw) { + pthread_mutex_lock(&spc->spc_mtx); sendunlockl(spc); TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); @@ -325,6 +325,7 @@ if (rw == NULL) { DPRINTF(("no waiter found, invalid reqno %" PRIu64 "?\n", spc->spc_hdr.rsp_reqno)); + pthread_mutex_unlock(&spc->spc_mtx); spcfreebuf(spc); return; }