Module Name: src Committed By: pooka Date: Mon Jan 24 17:47:52 UTC 2011
Modified Files: src/lib/librumpclient: rumpclient.c src/lib/librumpuser: sp_common.c Log Message: Add reconnect code to librumpclient. In case the connection to the kernel server is lost, the client will now automatically attempt to reconnect. Among other things, this makes it possible to "reboot" and restart the TCP/IP stack from under firefox without any perceivable less of service. If pages were loading at the time the TCP/IP server was killed, there may be some broken links, but nothing a ctrl-r cannot fix. To generate a diff of this commit: cvs rdiff -u -r1.17 -r1.18 src/lib/librumpclient/rumpclient.c cvs rdiff -u -r1.25 -r1.26 src/lib/librumpuser/sp_common.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/lib/librumpclient/rumpclient.c diff -u src/lib/librumpclient/rumpclient.c:1.17 src/lib/librumpclient/rumpclient.c:1.18 --- src/lib/librumpclient/rumpclient.c:1.17 Fri Jan 21 10:43:33 2011 +++ src/lib/librumpclient/rumpclient.c Mon Jan 24 17:47:51 2011 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpclient.c,v 1.17 2011/01/21 10:43:33 pooka Exp $ */ +/* $NetBSD: rumpclient.c,v 1.18 2011/01/24 17:47:51 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -50,6 +50,7 @@ #include <pthread.h> #include <signal.h> #include <stdarg.h> +#include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -78,19 +79,51 @@ .spc_fd = -1, }; -static int kq; +static int kq = -1; static sigset_t fullset; +static int doconnect(int); +static int handshake_req(struct spclient *, uint32_t *, int, bool); + +int didrecon; + +static int +send_with_recon(struct spclient *spc, const void *data, size_t dlen) +{ + int rv; + + do { + rv = dosend(spc, data, dlen); + if (__predict_false(rv == ENOTCONN || rv == EBADF)) { + if ((rv = doconnect(1)) != 0) + continue; + if ((rv = handshake_req(&clispc, NULL, 0, true)) != 0) + continue; + rv = ENOTCONN; + break; + } + } while (__predict_false(rv != 0)); + + return rv; +} + static int -waitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask) +cliwaitresp(struct spclient *spc, struct respwait *rw, sigset_t *mask, + bool keeplock) { + uint64_t mygen; + bool imalive = true; pthread_mutex_lock(&spc->spc_mtx); - sendunlockl(spc); + if (!keeplock) + sendunlockl(spc); + mygen = spc->spc_generation; rw->rw_error = 0; - while (!rw->rw_done && rw->rw_error == 0 - && spc->spc_state != SPCSTATE_DYING){ + while (!rw->rw_done && rw->rw_error == 0) { + if (__predict_false(spc->spc_generation != mygen || !imalive)) + break; + /* are we free to receive? */ if (spc->spc_istatus == SPCSTATUS_FREE) { struct kevent kev[8]; @@ -105,7 +138,16 @@ case 0: rv = host_kevent(kq, NULL, 0, kev, __arraycount(kev), NULL); - assert(rv > 0); + + /* + * XXX: don't know how this can + * happen (timeout cannot expire + * since there isn't one), but + * it does happen + */ + if (__predict_false(rv == 0)) + continue; + for (i = 0; i < rv; i++) { if (kev[i].filter == EVFILT_SIGNAL) @@ -116,7 +158,7 @@ continue; case -1: - spc->spc_state = SPCSTATE_DYING; + imalive = false; goto cleanup; default: break; @@ -160,12 +202,12 @@ pthread_mutex_unlock(&spc->spc_mtx); pthread_cond_destroy(&rw->rw_cv); - if (spc->spc_state == SPCSTATE_DYING) + if (spc->spc_generation != mygen || !imalive) { return ENOTCONN; + } return rw->rw_error; } - static int syscall_req(struct spclient *spc, int sysnum, const void *data, size_t dlen, void **resp) @@ -182,18 +224,18 @@ pthread_sigmask(SIG_SETMASK, &fullset, &omask); do { - putwait(spc, &rw, &rhdr); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - rv = dosend(spc, data, dlen); - if (rv) { + if ((rv = send_with_recon(spc, &rhdr, sizeof(rhdr))) != 0) { unputwait(spc, &rw); - pthread_sigmask(SIG_SETMASK, &omask, NULL); - return rv; + continue; + } + if ((rv = send_with_recon(spc, data, dlen)) != 0) { + unputwait(spc, &rw); + continue; } - rv = waitresp(spc, &rw, &omask); - } while (rv == EAGAIN); + rv = cliwaitresp(spc, &rw, &omask, false); + } while (rv == ENOTCONN || rv == EAGAIN); pthread_sigmask(SIG_SETMASK, &omask, NULL); *resp = rw.rw_data; @@ -201,7 +243,7 @@ } static int -handshake_req(struct spclient *spc, uint32_t *auth, int cancel) +handshake_req(struct spclient *spc, uint32_t *auth, int cancel, bool haslock) { struct handshake_fork rf; struct rsp_hdr rhdr; @@ -219,20 +261,28 @@ rhdr.rsp_handshake = HANDSHAKE_GUEST; pthread_sigmask(SIG_SETMASK, &fullset, &omask); - putwait(spc, &rw, &rhdr); + if (haslock) + putwait_locked(spc, &rw, &rhdr); + else + putwait(spc, &rw, &rhdr); rv = dosend(spc, &rhdr, sizeof(rhdr)); if (auth) { memcpy(rf.rf_auth, auth, AUTHLEN*sizeof(*auth)); rf.rf_cancel = cancel; - rv = dosend(spc, &rf, sizeof(rf)); + rv = send_with_recon(spc, &rf, sizeof(rf)); } - if (rv != 0 || cancel) { - unputwait(spc, &rw); - pthread_sigmask(SIG_SETMASK, &omask, NULL); - return rv; + if (rv || cancel) { + if (haslock) + unputwait_locked(spc, &rw); + else + unputwait(spc, &rw); + if (cancel) { + pthread_sigmask(SIG_SETMASK, &omask, NULL); + return rv; + } + } else { + rv = cliwaitresp(spc, &rw, &omask, haslock); } - - rv = waitresp(spc, &rw, &omask); pthread_sigmask(SIG_SETMASK, &omask, NULL); if (rv) return rv; @@ -257,26 +307,51 @@ rhdr.rsp_error = 0; pthread_sigmask(SIG_SETMASK, &fullset, &omask); - putwait(spc, &rw, &rhdr); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - if (rv != 0) { - unputwait(spc, &rw); - pthread_sigmask(SIG_SETMASK, &omask, NULL); - return rv; - } + do { + putwait(spc, &rw, &rhdr); + rv = send_with_recon(spc, &rhdr, sizeof(rhdr)); + if (rv != 0) { + unputwait(spc, &rw); + continue; + } - rv = waitresp(spc, &rw, &omask); + rv = cliwaitresp(spc, &rw, &omask, false); + } while (rv == ENOTCONN || rv == EAGAIN); pthread_sigmask(SIG_SETMASK, &omask, NULL); + *resp = rw.rw_data; return rv; } +/* + * prevent response code from deadlocking with reconnect code + */ static int +resp_sendlock(struct spclient *spc) +{ + int rv = 0; + + pthread_mutex_lock(&spc->spc_mtx); + while (spc->spc_ostatus != SPCSTATUS_FREE) { + if (__predict_false(spc->spc_reconnecting)) { + rv = EBUSY; + goto out; + } + spc->spc_ostatus = SPCSTATUS_WANTED; + pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx); + } + spc->spc_ostatus = SPCSTATUS_BUSY; + + out: + pthread_mutex_unlock(&spc->spc_mtx); + return rv; +} + +static void send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen, int wantstr) { struct rsp_hdr rhdr; - int rv; if (wantstr) dlen = MIN(dlen, strlen(data)+1); @@ -287,19 +362,17 @@ rhdr.rsp_type = RUMPSP_COPYIN; rhdr.rsp_sysnum = 0; - sendlock(spc); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - rv = dosend(spc, data, dlen); + if (resp_sendlock(spc) != 0) + return; + (void)dosend(spc, &rhdr, sizeof(rhdr)); + (void)dosend(spc, data, dlen); sendunlock(spc); - - return rv; } -static int +static void send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr) { struct rsp_hdr rhdr; - int rv; rhdr.rsp_len = sizeof(rhdr) + sizeof(addr); rhdr.rsp_reqno = reqno; @@ -307,12 +380,11 @@ rhdr.rsp_type = RUMPSP_ANONMMAP; rhdr.rsp_sysnum = 0; - sendlock(spc); - rv = dosend(spc, &rhdr, sizeof(rhdr)); - rv = dosend(spc, &addr, sizeof(addr)); + if (resp_sendlock(spc) != 0) + return; + (void)dosend(spc, &rhdr, sizeof(rhdr)); + (void)dosend(spc, &addr, sizeof(addr)); sendunlock(spc); - - return rv; } int @@ -383,7 +455,7 @@ break; case RUMPSP_RAISE: DPRINTF(("rump_sp handlereq: raise sig %d\n", rhdr->rsp_signo)); - raise(rhdr->rsp_signo); + raise((int)rhdr->rsp_signo); /* * We most likely have signals blocked, but the signal * will be handled soon enough when we return. @@ -402,22 +474,92 @@ static struct sockaddr *serv_sa; static int -doconnect(void) +doconnect(int retry) { + time_t prevreconmsg; + unsigned reconretries; + struct respwait rw; + struct rsp_hdr rhdr; struct kevent kev[NSIG+1]; char banner[MAXBANNER]; + struct pollfd pfd; int s, error, flags, i; ssize_t n; + if (kq != -1) + host_close(kq); + kq = -1; + + prevreconmsg = 0; + reconretries = 0; + + again: + if (clispc.spc_fd != -1) + host_close(clispc.spc_fd); + clispc.spc_fd = -1; + + /* + * for reconnect, gate everyone out of the receiver code + */ + putwait_locked(&clispc, &rw, &rhdr); + + pthread_mutex_lock(&clispc.spc_mtx); + clispc.spc_reconnecting = 1; + pthread_cond_broadcast(&clispc.spc_cv); + clispc.spc_generation++; + while (clispc.spc_istatus != SPCSTATUS_FREE) { + clispc.spc_istatus = SPCSTATUS_WANTED; + pthread_cond_wait(&rw.rw_cv, &clispc.spc_mtx); + } + kickall(&clispc); + + /* + * we can release it already since we hold the + * send lock during reconnect + * XXX: assert it + */ + clispc.spc_istatus = SPCSTATUS_FREE; + pthread_mutex_unlock(&clispc.spc_mtx); + unputwait_locked(&clispc, &rw); + + free(clispc.spc_buf); + clispc.spc_off = 0; + s = host_socket(parsetab[ptab_idx].domain, SOCK_STREAM, 0); if (s == -1) return -1; - if (host_connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) { - error = errno; - fprintf(stderr, "rump_sp: client connect failed\n"); - errno = error; - return -1; + pfd.fd = s; + pfd.events = POLLIN; + while (host_connect(s, serv_sa, (socklen_t)serv_sa->sa_len) == -1) { + if (errno == EINTR) + continue; + if (!retry) { + error = errno; + fprintf(stderr, "rump_sp: client connect failed: %s\n", + strerror(errno)); + errno = error; + return -1; + } + + if (prevreconmsg == 0) { + fprintf(stderr, "rump_sp: connection to kernel lost, " + "trying to reconnect ...\n"); + prevreconmsg = time(NULL); + } + if (time(NULL) - prevreconmsg > 120) { + fprintf(stderr, "rump_sp: still trying to " + "reconnect ...\n"); + prevreconmsg = time(NULL); + } + + /* adhoc backoff timer */ + if (reconretries++ < 10) { + usleep(100000 * reconretries); + } else { + sleep(MIN(10, reconretries-9)); + } + goto again; } if ((error = parsetab[ptab_idx].connhook(s)) != 0) { @@ -440,15 +582,22 @@ return -1; } banner[n] = '\0'; + /* parse the banner some day */ flags = host_fcntl(s, F_GETFL, 0); if (host_fcntl(s, F_SETFL, flags | O_NONBLOCK) == -1) { - fprintf(stderr, "rump_sp: cannot set socket fd to nonblock\n"); + fprintf(stderr, "rump_sp: socket fd NONBLOCK: %s\n", + strerror(errno)); errno = EINVAL; return -1; } + clispc.spc_fd = s; + clispc.spc_state = SPCSTATE_RUNNING; + clispc.spc_reconnecting = 0; - /* parse the banner some day */ + if (prevreconmsg) { + fprintf(stderr, "rump_sp: reconnected!\n"); + } /* setup kqueue, we want all signals and the fd */ if ((kq = host_kqueue()) == -1) { @@ -461,7 +610,8 @@ for (i = 0; i < NSIG; i++) { EV_SET(&kev[i], i+1, EVFILT_SIGNAL, EV_ADD|EV_ENABLE, 0, 0, 0); } - EV_SET(&kev[NSIG], s, EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0); + EV_SET(&kev[NSIG], clispc.spc_fd, + EVFILT_READ, EV_ADD|EV_ENABLE, 0, 0, 0); if (host_kevent(kq, kev, NSIG+1, NULL, 0, NULL) == -1) { error = errno; fprintf(stderr, "rump_sp: kevent() failed"); @@ -469,7 +619,13 @@ return -1; } - clispc.spc_fd = s; + return 0; +} + +static int +doinit(void) +{ + TAILQ_INIT(&clispc.spc_respwait); pthread_mutex_init(&clispc.spc_mtx, NULL); pthread_cond_init(&clispc.spc_cv, NULL); @@ -521,14 +677,17 @@ return -1; } - if (doconnect() == -1) + if (doinit() == -1) + return -1; + if (doconnect(0) == -1) return -1; - error = handshake_req(&clispc, NULL, 0); + error = handshake_req(&clispc, NULL, 0, false); if (error) { pthread_mutex_destroy(&clispc.spc_mtx); pthread_cond_destroy(&clispc.spc_cv); - host_close(clispc.spc_fd); + if (clispc.spc_fd != -1) + host_close(clispc.spc_fd); errno = error; return -1; } @@ -569,16 +728,16 @@ { int error; - host_close(clispc.spc_fd); - host_close(kq); - kq = -1; memset(&clispc, 0, sizeof(clispc)); clispc.spc_fd = -1; + kq = -1; - if (doconnect() == -1) + if (doinit() == -1) + return -1; + if (doconnect(1) == -1) return -1; - error = handshake_req(&clispc, rpf->fork_auth, 0); + error = handshake_req(&clispc, rpf->fork_auth, 0, false); if (error) { pthread_mutex_destroy(&clispc.spc_mtx); pthread_cond_destroy(&clispc.spc_cv); Index: src/lib/librumpuser/sp_common.c diff -u src/lib/librumpuser/sp_common.c:1.25 src/lib/librumpuser/sp_common.c:1.26 --- src/lib/librumpuser/sp_common.c:1.25 Sat Jan 22 13:41:22 2011 +++ src/lib/librumpuser/sp_common.c Mon Jan 24 17:47:52 2011 @@ -1,4 +1,4 @@ -/* $NetBSD: sp_common.c,v 1.25 2011/01/22 13:41:22 pooka Exp $ */ +/* $NetBSD: sp_common.c,v 1.26 2011/01/24 17:47:52 pooka Exp $ */ /* * Copyright (c) 2010, 2011 Antti Kantee. All Rights Reserved. @@ -177,7 +177,9 @@ uint64_t spc_nextreq; uint64_t spc_syscallreq; + uint64_t spc_generation; int spc_ostatus, spc_istatus; + int spc_reconnecting; LIST_HEAD(, prefork) spc_pflist; }; @@ -223,7 +225,7 @@ spc->spc_ostatus = SPCSTATUS_BUSY; } -static void +static void __unused sendlock(struct spclient *spc) { @@ -273,14 +275,16 @@ n = host_sendto(fd, sdata + sent, dlen - sent, MSG_NOSIGNAL, NULL, 0); - if (n == 0) { - return ENOTCONN; - } if (n == -1) { + if (errno == EPIPE) + return ENOTCONN; if (errno != EAGAIN) return errno; continue; } + if (n == 0) { + return ENOTCONN; + } sent += n; } @@ -288,7 +292,7 @@ } static void -putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) +doputwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) { rw->rw_data = NULL; @@ -298,21 +302,51 @@ pthread_mutex_lock(&spc->spc_mtx); rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++; TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries); +} + +static void __unused +putwait_locked(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) +{ + + doputwait(spc, rw, rhdr); + pthread_mutex_unlock(&spc->spc_mtx); +} + +static void +putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) +{ + doputwait(spc, rw, rhdr); sendlockl(spc); pthread_mutex_unlock(&spc->spc_mtx); } static void +dounputwait(struct spclient *spc, struct respwait *rw) +{ + + TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); + pthread_mutex_unlock(&spc->spc_mtx); + pthread_cond_destroy(&rw->rw_cv); + +} + +static void __unused +unputwait_locked(struct spclient *spc, struct respwait *rw) +{ + + pthread_mutex_lock(&spc->spc_mtx); + dounputwait(spc, rw); +} + +static void unputwait(struct spclient *spc, struct respwait *rw) { pthread_mutex_lock(&spc->spc_mtx); sendunlockl(spc); - TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); - pthread_mutex_unlock(&spc->spc_mtx); - pthread_cond_destroy(&rw->rw_cv); + dounputwait(spc, rw); } static void