Module Name: src Committed By: pooka Date: Wed Nov 24 11:40:24 UTC 2010
Modified Files: src/lib/librumpuser: rumpuser_sp.c Log Message: Sneeze some locking into connect/disconnect. To generate a diff of this commit: cvs rdiff -u -r1.10 -r1.11 src/lib/librumpuser/rumpuser_sp.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/lib/librumpuser/rumpuser_sp.c diff -u src/lib/librumpuser/rumpuser_sp.c:1.10 src/lib/librumpuser/rumpuser_sp.c:1.11 --- src/lib/librumpuser/rumpuser_sp.c:1.10 Mon Nov 22 20:42:19 2010 +++ src/lib/librumpuser/rumpuser_sp.c Wed Nov 24 11:40:24 2010 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $ */ +/* $NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $ */ /* * Copyright (c) 2010 Antti Kantee. All Rights Reserved. @@ -38,9 +38,10 @@ */ #include <sys/cdefs.h> -__RCSID("$NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $"); +__RCSID("$NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $"); #include <sys/types.h> +#include <sys/atomic.h> #include <sys/mman.h> #include <sys/socket.h> @@ -67,7 +68,7 @@ static struct pollfd pfdlist[MAXCLI]; static struct spclient spclist[MAXCLI]; -static unsigned int nfds, maxidx; +static unsigned int disco; static struct rumpuser_sp_ops spops; @@ -287,66 +288,81 @@ } static void -serv_handledisco(unsigned int idx) +spcref(struct spclient *spc) { - struct spclient *spc = &spclist[idx]; - int fd = spc->spc_fd; - DPRINTF(("rump_sp: disconnecting [%u]\n", idx)); + pthread_mutex_lock(&spc->spc_mtx); + spc->spc_refcnt++; + pthread_mutex_unlock(&spc->spc_mtx); +} + +static void +spcrelease(struct spclient *spc) +{ + int ref; + + pthread_mutex_lock(&spc->spc_mtx); + ref = --spc->spc_refcnt; + pthread_mutex_unlock(&spc->spc_mtx); + + if (ref > 0) + return; + + _DIAGASSERT(TAILQ_EMPTY(&spclist[i].spc_respwait)); + _DIAGASSERT(spc->spc_buf == NULL); lwproc_switch(spc->spc_mainlwp); lwproc_release(); + spc->spc_mainlwp = NULL; + + close(spc->spc_fd); + spc->spc_fd = -1; + + spc->spc_pfd->fd = -1; + membar_producer(); + atomic_inc_uint(&disco); - pthread_mutex_destroy(&spc->spc_mtx); - pthread_cond_destroy(&spc->spc_cv); - free(spc->spc_buf); - memset(spc, 0, sizeof(*spc)); - close(fd); - pfdlist[idx].fd = -1; - nfds--; - - if (idx == maxidx) { - while (idx--) { - if (pfdlist[idx].fd != -1) { - maxidx = idx; - break; - } - assert(idx != 0); - } - DPRINTF(("rump_sp: set maxidx to [%u]\n", maxidx)); - } } -static int -serv_handleconn(int fd, connecthook_fn connhook) +static void +serv_handledisco(unsigned int idx) +{ + struct spclient *spc = &spclist[idx]; + + DPRINTF(("rump_sp: disconnecting [%u]\n", idx)); + + spcrelease(spc); +} + +static unsigned +serv_handleconn(int fd, connecthook_fn connhook, int busy) { struct sockaddr_storage ss; socklen_t sl = sizeof(ss); - int newfd, flags, error; + int newfd, flags; unsigned i; /*LINTED: cast ok */ newfd = accept(fd, (struct sockaddr *)&ss, &sl); if (newfd == -1) - return errno; - - /* XXX: should do some sort of handshake too */ + return 0; - if (nfds == MAXCLI) { + if (busy) { close(newfd); /* EBUSY */ - return EBUSY; + return 0; } + /* XXX: should do some sort of handshake too */ + flags = fcntl(newfd, F_GETFL, 0); if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) { close(newfd); - return errno; + return 0; } - flags = 1; - if ((error = connhook(newfd)) != 0) { + if (connhook(newfd) != 0) { close(newfd); - return error; + return 0; } /* find empty slot the simple way */ @@ -355,33 +371,28 @@ break; } - if ((error = lwproc_newproc(&spclist[i])) != 0) { + if (lwproc_newproc(&spclist[i]) != 0) { close(newfd); - return error; + return 0; } assert(i < MAXCLI); - nfds++; pfdlist[i].fd = newfd; spclist[i].spc_fd = newfd; spclist[i].spc_mainlwp = lwproc_curlwp(); spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */ spclist[i].spc_pid = lwproc_getpid(); + spclist[i].spc_refcnt = 1; TAILQ_INIT(&spclist[i].spc_respwait); - pthread_mutex_init(&spclist[i].spc_mtx, NULL); - pthread_cond_init(&spclist[i].spc_cv, NULL); - - if (maxidx < i) - maxidx = i; DPRINTF(("rump_sp: added new connection at idx %u, pid %d\n", i, lwproc_getpid())); lwproc_switch(NULL); - return 0; + return i; } static void @@ -503,6 +514,7 @@ pthread_attr_init(&pattr); pthread_attr_setdetachstate(&pattr, 1); + spcref(spc); if ((rv = pthread_create(&pt, &pattr, serv_syscallbouncer, sba)) != 0) { /* panic */ abort(); @@ -513,13 +525,21 @@ spserver(void *arg) { struct spservarg *sarg = arg; + struct spclient *spc; unsigned idx; int seen; int rv; + unsigned int nfds, maxidx; - for (idx = 1; idx < MAXCLI; idx++) { + for (idx = 0; idx < MAXCLI; idx++) { pfdlist[idx].fd = -1; pfdlist[idx].events = POLLIN; + + spc = &spclist[idx]; + + spc->spc_pfd = &pfdlist[idx]; + pthread_mutex_init(&spc->spc_mtx, NULL); + pthread_cond_init(&spc->spc_cv, NULL); } pfdlist[0].fd = sarg->sps_sock; pfdlist[0].events = POLLIN; @@ -529,6 +549,27 @@ DPRINTF(("rump_sp: server mainloop\n")); for (;;) { + /* g/c hangarounds (eventually) */ + if (disco) { + int discoed; + + membar_consumer(); + discoed = atomic_swap_uint(&disco, 0); + while (discoed--) { + nfds--; + idx = maxidx; + while (idx--) { + if (pfdlist[idx].fd != -1) { + maxidx = idx; + break; + } + assert(idx != 0); + } + DPRINTF(("rump_sp: set maxidx to [%u]\n", + maxidx)); + } + } + DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1)); seen = 0; rv = poll(pfdlist, maxidx+1, INFTIM); @@ -552,7 +593,7 @@ DPRINTF(("rump_sp: activity at [%u] %d/%d\n", idx, seen, rv)); if (idx > 0) { - struct spclient *spc = &spclist[idx]; + spc = &spclist[idx]; DPRINTF(("rump_sp: mainloop read [%u]\n", idx)); switch (readframe(spc)) { @@ -576,10 +617,16 @@ } break; } + } else { DPRINTF(("rump_sp: mainloop new connection\n")); - serv_handleconn(pfdlist[0].fd, - sarg->sps_connhook); + + idx = serv_handleconn(pfdlist[0].fd, + sarg->sps_connhook, nfds == MAXCLI); + if (idx) + nfds++; + if (idx > maxidx) + maxidx = idx; } } }