Module Name: src Committed By: pooka Date: Fri Nov 19 15:25:50 UTC 2010
Modified Files: src/lib/librumpclient: rumpclient.c src/lib/librumpuser: rumpuser_sp.c sp_common.c Log Message: Start working on making the syscall proxy code threadsafe. The basics are there, but a few more tweaks are needed. The reason I'm committing it now is that the code was mindnumbingly boring to write (no wonder it took me almost 3 years to get it done), and I might burn it if it's not in a safe place. To generate a diff of this commit: cvs rdiff -u -r1.2 -r1.3 src/lib/librumpclient/rumpclient.c cvs rdiff -u -r1.6 -r1.7 src/lib/librumpuser/rumpuser_sp.c cvs rdiff -u -r1.3 -r1.4 src/lib/librumpuser/sp_common.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/lib/librumpclient/rumpclient.c diff -u src/lib/librumpclient/rumpclient.c:1.2 src/lib/librumpclient/rumpclient.c:1.3 --- src/lib/librumpclient/rumpclient.c:1.2 Fri Nov 5 13:50:48 2010 +++ src/lib/librumpclient/rumpclient.c Fri Nov 19 15:25:49 2010 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpclient.c,v 1.2 2010/11/05 13:50:48 pooka Exp $ */ +/* $NetBSD: rumpclient.c,v 1.3 2010/11/19 15:25:49 pooka Exp $ */ /* * Copyright (c) 2010 Antti Kantee. All Rights Reserved. @@ -58,52 +58,70 @@ static struct spclient clispc; static int -send_syscall_req(struct spclient *spc, int sysnum, - const void *data, size_t dlen) +syscall_req(struct spclient *spc, int sysnum, + const void *data, size_t dlen, void **resp) { struct rsp_hdr rhdr; + struct respwait rw; + int rv; rhdr.rsp_len = sizeof(rhdr) + dlen; - rhdr.rsp_reqno = nextreq++; - rhdr.rsp_type = RUMPSP_SYSCALL_REQ; + rhdr.rsp_class = RUMPSP_REQ; + rhdr.rsp_type = RUMPSP_SYSCALL; rhdr.rsp_sysnum = sysnum; - dosend(spc, &rhdr, sizeof(rhdr)); - dosend(spc, data, dlen); + putwait(spc, &rw, &rhdr); - return 0; + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, data, dlen); + sendunlock(spc); + if (rv) + return rv; /* XXX: unputwait */ + + rv = waitresp(spc, &rw); + *resp = rw.rw_data; + return rv; } static int send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen) { struct rsp_hdr rhdr; + int rv; rhdr.rsp_len = sizeof(rhdr) + dlen; rhdr.rsp_reqno = reqno; - rhdr.rsp_type = RUMPSP_COPYIN_RESP; + rhdr.rsp_class = RUMPSP_RESP; + rhdr.rsp_type = RUMPSP_COPYIN; rhdr.rsp_sysnum = 0; - dosend(spc, &rhdr, sizeof(rhdr)); - dosend(spc, data, dlen); + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, data, dlen); + sendunlock(spc); - return 0; + return rv; } static int send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr) { struct rsp_hdr rhdr; + int rv; rhdr.rsp_len = sizeof(rhdr) + sizeof(addr); rhdr.rsp_reqno = reqno; - rhdr.rsp_type = RUMPSP_ANONMMAP_RESP; + rhdr.rsp_class = RUMPSP_RESP; + rhdr.rsp_type = RUMPSP_ANONMMAP; rhdr.rsp_sysnum = 0; - dosend(spc, &rhdr, sizeof(rhdr)); - dosend(spc, &addr, sizeof(addr)); + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, &addr, sizeof(addr)); + sendunlock(spc); - return 0; + return rv; } int @@ -111,71 +129,71 @@ register_t *retval) { struct rsp_sysresp *resp; - struct rsp_copydata *copydata; - struct pollfd pfd; - size_t maplen; - void *mapaddr; - int gotresp; + void *rdata; + int rv; - DPRINTF(("rump_sp_syscall: executing syscall %d\n", sysnum)); + DPRINTF(("rumpsp syscall_req: syscall %d with %p/%zu\n", + sysnum, data, dlen)); - send_syscall_req(&clispc, sysnum, data, dlen); + rv = syscall_req(&clispc, sysnum, data, dlen, &rdata); + if (rv) + return rv; + + resp = rdata; + DPRINTF(("rumpsp syscall_resp: syscall %d error %d, rv: %d/%d\n", + sysnum, rv, resp->rsys_retval[0], resp->rsys_retval[1])); - DPRINTF(("rump_sp_syscall: syscall %d request sent. " - "waiting for response\n", sysnum)); + memcpy(retval, &resp->rsys_retval, sizeof(resp->rsys_retval)); + rv = resp->rsys_error; + free(rdata); - pfd.fd = clispc.spc_fd; - pfd.events = POLLIN; - - gotresp = 0; - while (!gotresp) { - while (readframe(&clispc) < 1) - poll(&pfd, 1, INFTIM); - - switch (clispc.spc_hdr.rsp_type) { - case RUMPSP_COPYIN_REQ: - /*LINTED*/ - copydata = (struct rsp_copydata *)clispc.spc_buf; - DPRINTF(("rump_sp_syscall: copyin request: %p/%zu\n", - copydata->rcp_addr, copydata->rcp_len)); - send_copyin_resp(&clispc, clispc.spc_hdr.rsp_reqno, - copydata->rcp_addr, copydata->rcp_len); - clispc.spc_off = 0; - break; - case RUMPSP_COPYOUT_REQ: - /*LINTED*/ - copydata = (struct rsp_copydata *)clispc.spc_buf; - DPRINTF(("rump_sp_syscall: copyout request: %p/%zu\n", - copydata->rcp_addr, copydata->rcp_len)); - /*LINTED*/ - memcpy(copydata->rcp_addr, copydata->rcp_data, - copydata->rcp_len); - clispc.spc_off = 0; - break; - case RUMPSP_ANONMMAP_REQ: - /*LINTED*/ - maplen = *(size_t *)clispc.spc_buf; - mapaddr = mmap(NULL, maplen, PROT_READ|PROT_WRITE, - MAP_ANON, -1, 0); - if (mapaddr == MAP_FAILED) - mapaddr = NULL; - send_anonmmap_resp(&clispc, - clispc.spc_hdr.rsp_reqno, mapaddr); - clispc.spc_off = 0; - break; - case RUMPSP_SYSCALL_RESP: - DPRINTF(("rump_sp_syscall: got response \n")); - gotresp = 1; - break; - } - } + return rv; +} - /*LINTED*/ - resp = (struct rsp_sysresp *)clispc.spc_buf; - memcpy(retval, &resp->rsys_retval, sizeof(resp->rsys_retval)); - clispc.spc_off = 0; +static void +handlereq(struct spclient *spc) +{ + struct rsp_copydata *copydata; + void *mapaddr; + size_t maplen; + + switch (spc->spc_hdr.rsp_type) { + case RUMPSP_COPYIN: + /*LINTED*/ + copydata = (struct rsp_copydata *)spc->spc_buf; + DPRINTF(("rump_sp handlereq: copyin request: %p/%zu\n", + copydata->rcp_addr, copydata->rcp_len)); + send_copyin_resp(spc, spc->spc_hdr.rsp_reqno, + copydata->rcp_addr, copydata->rcp_len); + break; + case RUMPSP_COPYOUT: + /*LINTED*/ + copydata = (struct rsp_copydata *)spc->spc_buf; + DPRINTF(("rump_sp handlereq: copyout request: %p/%zu\n", + copydata->rcp_addr, copydata->rcp_len)); + /*LINTED*/ + memcpy(copydata->rcp_addr, copydata->rcp_data, + copydata->rcp_len); + break; + case RUMPSP_ANONMMAP: + /*LINTED*/ + maplen = *(size_t *)spc->spc_buf; + mapaddr = mmap(NULL, maplen, PROT_READ|PROT_WRITE, + MAP_ANON, -1, 0); + if (mapaddr == MAP_FAILED) + mapaddr = NULL; + DPRINTF(("rump_sp handlereq: anonmmap: %p\n", mapaddr)); + send_anonmmap_resp(spc, spc->spc_hdr.rsp_reqno, mapaddr); + break; + default: + printf("PANIC: INVALID TYPE\n"); + abort(); + break; + } - return resp->rsys_error; + free(spc->spc_buf); + spc->spc_off = 0; + spc->spc_buf = NULL; } int @@ -213,6 +231,9 @@ return -1; } clispc.spc_fd = s; + TAILQ_INIT(&clispc.spc_respwait); + pthread_mutex_init(&clispc.spc_mtx, NULL); + pthread_cond_init(&clispc.spc_cv, NULL); return 0; } Index: src/lib/librumpuser/rumpuser_sp.c diff -u src/lib/librumpuser/rumpuser_sp.c:1.6 src/lib/librumpuser/rumpuser_sp.c:1.7 --- src/lib/librumpuser/rumpuser_sp.c:1.6 Wed Nov 17 17:36:14 2010 +++ src/lib/librumpuser/rumpuser_sp.c Fri Nov 19 15:25:49 2010 @@ -1,4 +1,4 @@ -/* $NetBSD: rumpuser_sp.c,v 1.6 2010/11/17 17:36:14 pooka Exp $ */ +/* $NetBSD: rumpuser_sp.c,v 1.7 2010/11/19 15:25:49 pooka Exp $ */ /* * Copyright (c) 2010 Antti Kantee. All Rights Reserved. @@ -38,7 +38,7 @@ */ #include <sys/cdefs.h> -__RCSID("$NetBSD: rumpuser_sp.c,v 1.6 2010/11/17 17:36:14 pooka Exp $"); +__RCSID("$NetBSD: rumpuser_sp.c,v 1.7 2010/11/19 15:25:49 pooka Exp $"); #include <sys/types.h> #include <sys/mman.h> @@ -68,7 +68,6 @@ static struct pollfd pfdlist[MAXCLI]; static struct spclient spclist[MAXCLI]; static unsigned int nfds, maxidx; -static uint64_t nextreq; static pthread_key_t spclient_tls; static struct rumpuser_sp_ops spops; @@ -132,45 +131,77 @@ return rv; } +static uint64_t +nextreq(struct spclient *spc) +{ + uint64_t nw; + + pthread_mutex_lock(&spc->spc_mtx); + nw = spc->spc_nextreq++; + pthread_mutex_unlock(&spc->spc_mtx); + + return nw; +} + static int send_syscall_resp(struct spclient *spc, uint64_t reqno, int error, - register_t retval[2]) + register_t *retval) { struct rsp_hdr rhdr; struct rsp_sysresp sysresp; + int rv; rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp); rhdr.rsp_reqno = reqno; - rhdr.rsp_type = RUMPSP_SYSCALL_RESP; + rhdr.rsp_class = RUMPSP_RESP; + rhdr.rsp_type = RUMPSP_SYSCALL; rhdr.rsp_sysnum = 0; sysresp.rsys_error = error; - memcpy(sysresp.rsys_retval, retval, sizeof(retval)); + memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval)); - dosend(spc, &rhdr, sizeof(rhdr)); - dosend(spc, &sysresp, sizeof(sysresp)); + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, &sysresp, sizeof(sysresp)); + sendunlock(spc); - return 0; + return rv; } static int -send_copyin_req(struct spclient *spc, const void *remaddr, size_t dlen) +copyin_req(struct spclient *spc, const void *remaddr, size_t dlen, void **resp) { struct rsp_hdr rhdr; struct rsp_copydata copydata; + struct respwait rw; + int rv; + + DPRINTF(("copyin_req: %zu bytes from %p\n", dlen, remaddr)); rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata); - rhdr.rsp_reqno = nextreq++; - rhdr.rsp_type = RUMPSP_COPYIN_REQ; + rhdr.rsp_class = RUMPSP_REQ; + rhdr.rsp_type = RUMPSP_COPYIN; rhdr.rsp_sysnum = 0; copydata.rcp_addr = __UNCONST(remaddr); copydata.rcp_len = dlen; - dosend(spc, &rhdr, sizeof(rhdr)); - dosend(spc, ©data, sizeof(copydata)); + putwait(spc, &rw, &rhdr); + + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, ©data, sizeof(copydata)); + sendunlock(spc); + if (rv) + return rv; /* XXX: unputwait */ + + rv = waitresp(spc, &rw); + + DPRINTF(("copyin: response %d\n", rv)); + + *resp = rw.rw_data; + return rv; - return 0; } static int @@ -179,36 +210,57 @@ { struct rsp_hdr rhdr; struct rsp_copydata copydata; + int rv; + + DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr)); rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen; - rhdr.rsp_reqno = nextreq++; - rhdr.rsp_type = RUMPSP_COPYOUT_REQ; + rhdr.rsp_reqno = nextreq(spc); + rhdr.rsp_class = RUMPSP_REQ; + rhdr.rsp_type = RUMPSP_COPYOUT; rhdr.rsp_sysnum = 0; copydata.rcp_addr = __UNCONST(remaddr); copydata.rcp_len = dlen; - dosend(spc, &rhdr, sizeof(rhdr)); - dosend(spc, ©data, sizeof(copydata)); - dosend(spc, data, dlen); + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, ©data, sizeof(copydata)); + rv = dosend(spc, data, dlen); + sendunlock(spc); - return 0; + return rv; } static int -send_anonmmap_req(struct spclient *spc, size_t howmuch) +anonmmap_req(struct spclient *spc, size_t howmuch, void **resp) { struct rsp_hdr rhdr; + struct respwait rw; + int rv; + + DPRINTF(("anonmmap_req: %zu bytes\n", howmuch)); rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch); - rhdr.rsp_reqno = nextreq++; - rhdr.rsp_type = RUMPSP_ANONMMAP_REQ; + rhdr.rsp_class = RUMPSP_REQ; + rhdr.rsp_type = RUMPSP_ANONMMAP; rhdr.rsp_sysnum = 0; - dosend(spc, &rhdr, sizeof(rhdr)); - dosend(spc, &howmuch, sizeof(howmuch)); + putwait(spc, &rw, &rhdr); - return 0; + sendlock(spc); + rv = dosend(spc, &rhdr, sizeof(rhdr)); + rv = dosend(spc, &howmuch, sizeof(howmuch)); + sendunlock(spc); + if (rv) + return rv; /* XXX: unputwait */ + + rv = waitresp(spc, &rw); + *resp = rw.rw_data; + + DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp)); + + return rv; } static void @@ -222,6 +274,8 @@ lwproc_switch(spc->spc_lwp); lwproc_release(); + pthread_mutex_destroy(&spc->spc_mtx); + pthread_cond_destroy(&spc->spc_cv); free(spc->spc_buf); memset(spc, 0, sizeof(*spc)); close(fd); @@ -289,6 +343,12 @@ pfdlist[i].fd = newfd; spclist[i].spc_fd = newfd; spclist[i].spc_lwp = lwproc_curlwp(); + spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */ + + TAILQ_INIT(&spclist[i].spc_respwait); + pthread_mutex_init(&spclist[i].spc_mtx, NULL); + pthread_cond_init(&spclist[i].spc_cv, NULL); + if (maxidx < i) maxidx = i; @@ -303,7 +363,7 @@ static void serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data) { - register_t retval[2]; + register_t retval[2] = {0, 0}; int rv, sysnum; sysnum = (int)rhdr->rsp_sysnum; @@ -317,36 +377,41 @@ pthread_setspecific(spclient_tls, NULL); free(data); - DPRINTF(("rump_sp: got return value %d\n", rv)); + DPRINTF(("rump_sp: got return value %d & %d/%d\n", + rv, retval[0], retval[1])); send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval); } +struct sysbouncearg { + struct spclient *sba_spc; + struct rsp_hdr sba_hdr; + uint8_t *sba_data; +}; +static void * +serv_syscallbouncer(void *arg) +{ + struct sysbouncearg *barg = arg; + + serv_handlesyscall(barg->sba_spc, &barg->sba_hdr, barg->sba_data); + free(arg); + return NULL; +} + int rumpuser_sp_copyin(const void *uaddr, void *kaddr, size_t len) { struct spclient *spc; - struct pollfd pfd; + void *rdata; spc = pthread_getspecific(spclient_tls); if (!spc) return EFAULT; - send_copyin_req(spc, uaddr, len); + copyin_req(spc, uaddr, len, &rdata); - pfd.fd = spc->spc_fd; - pfd.events = POLLIN; - do { - poll(&pfd, 1, INFTIM); - } while (readframe(spc) < 1); - - if (spc->spc_hdr.rsp_type != RUMPSP_COPYIN_RESP) { - abort(); - } - - memcpy(kaddr, spc->spc_buf, len); - free(spc->spc_buf); - spc->spc_off = 0; + memcpy(kaddr, rdata, len); + free(rdata); return 0; } @@ -362,8 +427,8 @@ return EFAULT; } - send_copyout_req(spc, uaddr, kaddr, dlen); - + if (send_copyout_req(spc, uaddr, kaddr, dlen) != 0) + return EFAULT; return 0; } @@ -371,31 +436,23 @@ rumpuser_sp_anonmmap(size_t howmuch, void **addr) { struct spclient *spc; - struct pollfd pfd; - void *resp; + void *resp, *rdata; + int rv; spc = pthread_getspecific(spclient_tls); if (!spc) return EFAULT; - send_anonmmap_req(spc, howmuch); - - pfd.fd = spc->spc_fd; - pfd.events = POLLIN; - do { - poll(&pfd, 1, INFTIM); - } while (readframe(spc) < 1); - - if (spc->spc_hdr.rsp_type != RUMPSP_ANONMMAP_RESP) { - abort(); - } + rv = anonmmap_req(spc, howmuch, &rdata); + if (rv) + return rv; - /*LINTED*/ - resp = *(void **)spc->spc_buf; - spc->spc_off = 0; + resp = *(void **)rdata; + free(rdata); - if (resp == NULL) + if (resp == NULL) { return ENOMEM; + } *addr = resp; return 0; @@ -412,6 +469,38 @@ connecthook_fn sps_connhook; }; +static void +handlereq(struct spclient *spc) +{ + struct sysbouncearg *sba; + pthread_attr_t pattr; + pthread_t pt; + int rv; + + /* XXX: check that it's a syscall */ + + sba = malloc(sizeof(*sba)); + if (sba == NULL) { + /* panic */ + abort(); + } + + sba->sba_spc = spc; + sba->sba_hdr = spc->spc_hdr; + sba->sba_data = spc->spc_buf; + + spc->spc_buf = NULL; + spc->spc_off = 0; + + pthread_attr_init(&pattr); + pthread_attr_setdetachstate(&pattr, 1); + + if ((rv = pthread_create(&pt, &pattr, serv_syscallbouncer, sba)) != 0) { + /* panic */ + abort(); + } +} + static void * spserver(void *arg) { @@ -465,10 +554,18 @@ serv_handledisco(idx); break; default: - spc->spc_off = 0; - serv_handlesyscall(spc, - &spc->spc_hdr, spc->spc_buf); - spc->spc_buf = NULL; + switch (spc->spc_hdr.rsp_class) { + case RUMPSP_RESP: + kickwaiter(spc); + break; + case RUMPSP_REQ: + handlereq(spc); + break; + default: + printf("PANIC\n"); + abort(); + break; + } break; } } else { Index: src/lib/librumpuser/sp_common.c diff -u src/lib/librumpuser/sp_common.c:1.3 src/lib/librumpuser/sp_common.c:1.4 --- src/lib/librumpuser/sp_common.c:1.3 Wed Nov 10 16:12:15 2010 +++ src/lib/librumpuser/sp_common.c Fri Nov 19 15:25:49 2010 @@ -1,4 +1,4 @@ -/* $NetBSD: sp_common.c,v 1.3 2010/11/10 16:12:15 pooka Exp $ */ +/* $NetBSD: sp_common.c,v 1.4 2010/11/19 15:25:49 pooka Exp $ */ /* * Copyright (c) 2010 Antti Kantee. All Rights Reserved. @@ -33,6 +33,7 @@ #include <sys/types.h> #include <sys/mman.h> +#include <sys/queue.h> #include <sys/socket.h> #include <sys/un.h> @@ -44,6 +45,7 @@ #include <errno.h> #include <fcntl.h> #include <poll.h> +#include <pthread.h> #include <stdarg.h> #include <stdio.h> #include <stdlib.h> @@ -70,17 +72,14 @@ * Bah, I hate writing on-off-wire conversions in C */ -enum { - RUMPSP_SYSCALL_REQ, RUMPSP_SYSCALL_RESP, - RUMPSP_COPYIN_REQ, RUMPSP_COPYIN_RESP, - RUMPSP_COPYOUT_REQ, /* no copyout resp */ - RUMPSP_ANONMMAP_REQ, RUMPSP_ANONMMAP_RESP -}; +enum { RUMPSP_REQ, RUMPSP_RESP }; +enum { RUMPSP_SYSCALL, RUMPSP_COPYIN, RUMPSP_COPYOUT, RUMPSP_ANONMMAP }; struct rsp_hdr { uint64_t rsp_len; uint64_t rsp_reqno; - uint32_t rsp_type; + uint16_t rsp_class; + uint16_t rsp_type; /* * We want this structure 64bit-aligned for typecast fun, * so might as well use the following for something. @@ -106,6 +105,15 @@ register_t rsys_retval[2]; }; +struct respwait { + uint64_t rw_reqno; + void *rw_data; + size_t rw_dlen; + + pthread_cond_t rw_cv; + + TAILQ_ENTRY(respwait) rw_entries; +}; struct spclient { int spc_fd; @@ -116,18 +124,47 @@ uint8_t *spc_buf; size_t spc_off; -#if 0 - /* outgoing */ - int spc_obusy; - pthread_mutex_t spc_omtx; + pthread_mutex_t spc_mtx; pthread_cond_t spc_cv; -#endif + + uint64_t spc_nextreq; + int spc_ostatus, spc_istatus; + + TAILQ_HEAD(, respwait) spc_respwait; }; +#define SPCSTATUS_FREE 0 +#define SPCSTATUS_BUSY 1 +#define SPCSTATUS_WANTED 2 typedef int (*addrparse_fn)(const char *, struct sockaddr **, int); typedef int (*connecthook_fn)(int); -static uint64_t nextreq; +static int readframe(struct spclient *); +static void handlereq(struct spclient *); + +static void +sendlock(struct spclient *spc) +{ + + pthread_mutex_lock(&spc->spc_mtx); + while (spc->spc_ostatus != SPCSTATUS_FREE) { + spc->spc_ostatus = SPCSTATUS_WANTED; + pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx); + } + spc->spc_ostatus = SPCSTATUS_BUSY; + pthread_mutex_unlock(&spc->spc_mtx); +} + +static void +sendunlock(struct spclient *spc) +{ + + pthread_mutex_lock(&spc->spc_mtx); + if (spc->spc_ostatus == SPCSTATUS_WANTED) + pthread_cond_broadcast(&spc->spc_cv); + spc->spc_ostatus = SPCSTATUS_FREE; + pthread_mutex_unlock(&spc->spc_mtx); +} static int dosend(struct spclient *spc, const void *data, size_t dlen) @@ -163,6 +200,108 @@ return 0; } +static void +putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr) +{ + + rw->rw_data = NULL; + rw->rw_dlen = 0; + pthread_cond_init(&rw->rw_cv, NULL); + + pthread_mutex_lock(&spc->spc_mtx); + rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++; + TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries); + pthread_mutex_unlock(&spc->spc_mtx); +} + +static void +kickwaiter(struct spclient *spc) +{ + struct respwait *rw; + + pthread_mutex_lock(&spc->spc_mtx); + TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries) { + if (rw->rw_reqno == spc->spc_hdr.rsp_reqno) + break; + } + if (rw == NULL) { + printf("PANIC: no waiter\n"); + pthread_mutex_unlock(&spc->spc_mtx); + return; + } + rw->rw_data = spc->spc_buf; + TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); + pthread_cond_signal(&rw->rw_cv); + pthread_mutex_unlock(&spc->spc_mtx); + + spc->spc_buf = NULL; + spc->spc_off = 0; +} + +static void +kickall(struct spclient *spc) +{ + struct respwait *rw; + + /* DIAGASSERT(mutex_owned(spc_lock)) */ + TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries) + pthread_cond_signal(&rw->rw_cv); +} + +static int +waitresp(struct spclient *spc, struct respwait *rw) +{ + struct pollfd pfd; + int rv = 0; + + pthread_mutex_lock(&spc->spc_mtx); + while (rw->rw_data == NULL) { + /* are we free to receive? */ + if (spc->spc_istatus == SPCSTATUS_FREE) { + int gotresp; + + spc->spc_istatus = SPCSTATUS_BUSY; + pthread_mutex_unlock(&spc->spc_mtx); + + pfd.fd = spc->spc_fd; + pfd.events = POLLIN; + + for (gotresp = 0; !gotresp; ) { + while (readframe(spc) < 1) + poll(&pfd, 1, INFTIM); + + switch (spc->spc_hdr.rsp_class) { + case RUMPSP_RESP: + kickwaiter(spc); + gotresp = spc->spc_hdr.rsp_reqno == + rw->rw_reqno; + break; + case RUMPSP_REQ: + handlereq(spc); + break; + default: + /* panic */ + break; + } + } + pthread_mutex_lock(&spc->spc_mtx); + if (spc->spc_istatus == SPCSTATUS_WANTED) + kickall(spc); + spc->spc_istatus = SPCSTATUS_FREE; + pthread_mutex_unlock(&spc->spc_mtx); + } else { + spc->spc_istatus = SPCSTATUS_WANTED; + pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx); + } + } + + TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries); + pthread_mutex_unlock(&spc->spc_mtx); + + pthread_cond_destroy(&rw->rw_cv); + return rv; +} + static int readframe(struct spclient *spc) {