Module Name:    src
Committed By:   pooka
Date:           Fri Nov 19 15:25:50 UTC 2010

Modified Files:
        src/lib/librumpclient: rumpclient.c
        src/lib/librumpuser: rumpuser_sp.c sp_common.c

Log Message:
Start working on making the syscall proxy code threadsafe.  The
basics are there, but a few more tweaks are needed.  The reason
I'm committing it now is that the code was mindnumbingly boring to
write (no wonder it took me almost 3 years to get it done), and I
might burn it if it's not in a safe place.


To generate a diff of this commit:
cvs rdiff -u -r1.2 -r1.3 src/lib/librumpclient/rumpclient.c
cvs rdiff -u -r1.6 -r1.7 src/lib/librumpuser/rumpuser_sp.c
cvs rdiff -u -r1.3 -r1.4 src/lib/librumpuser/sp_common.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/lib/librumpclient/rumpclient.c
diff -u src/lib/librumpclient/rumpclient.c:1.2 src/lib/librumpclient/rumpclient.c:1.3
--- src/lib/librumpclient/rumpclient.c:1.2	Fri Nov  5 13:50:48 2010
+++ src/lib/librumpclient/rumpclient.c	Fri Nov 19 15:25:49 2010
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpclient.c,v 1.2 2010/11/05 13:50:48 pooka Exp $	*/
+/*      $NetBSD: rumpclient.c,v 1.3 2010/11/19 15:25:49 pooka Exp $	*/
 
 /*
  * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
@@ -58,52 +58,70 @@
 static struct spclient clispc;
 
 static int
-send_syscall_req(struct spclient *spc, int sysnum,
-	const void *data, size_t dlen)
+syscall_req(struct spclient *spc, int sysnum,
+	const void *data, size_t dlen, void **resp)
 {
 	struct rsp_hdr rhdr;
+	struct respwait rw;
+	int rv;
 
 	rhdr.rsp_len = sizeof(rhdr) + dlen;
-	rhdr.rsp_reqno = nextreq++;
-	rhdr.rsp_type = RUMPSP_SYSCALL_REQ;
+	rhdr.rsp_class = RUMPSP_REQ;
+	rhdr.rsp_type = RUMPSP_SYSCALL;
 	rhdr.rsp_sysnum = sysnum;
 
-	dosend(spc, &rhdr, sizeof(rhdr));
-	dosend(spc, data, dlen);
+	putwait(spc, &rw, &rhdr);
 
-	return 0;
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, data, dlen);
+	sendunlock(spc);
+	if (rv)
+		return rv; /* XXX: unputwait */
+
+	rv = waitresp(spc, &rw);
+	*resp = rw.rw_data;
+	return rv;
 }
 
 static int
 send_copyin_resp(struct spclient *spc, uint64_t reqno, void *data, size_t dlen)
 {
 	struct rsp_hdr rhdr;
+	int rv;
 
 	rhdr.rsp_len = sizeof(rhdr) + dlen;
 	rhdr.rsp_reqno = reqno;
-	rhdr.rsp_type = RUMPSP_COPYIN_RESP;
+	rhdr.rsp_class = RUMPSP_RESP;
+	rhdr.rsp_type = RUMPSP_COPYIN;
 	rhdr.rsp_sysnum = 0;
 
-	dosend(spc, &rhdr, sizeof(rhdr));
-	dosend(spc, data, dlen);
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, data, dlen);
+	sendunlock(spc);
 
-	return 0;
+	return rv;
 }
 
 static int
 send_anonmmap_resp(struct spclient *spc, uint64_t reqno, void *addr)
 {
 	struct rsp_hdr rhdr;
+	int rv;
 
 	rhdr.rsp_len = sizeof(rhdr) + sizeof(addr);
 	rhdr.rsp_reqno = reqno;
-	rhdr.rsp_type = RUMPSP_ANONMMAP_RESP;
+	rhdr.rsp_class = RUMPSP_RESP;
+	rhdr.rsp_type = RUMPSP_ANONMMAP;
 	rhdr.rsp_sysnum = 0;
 
-	dosend(spc, &rhdr, sizeof(rhdr));
-	dosend(spc, &addr, sizeof(addr));
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, &addr, sizeof(addr));
+	sendunlock(spc);
 
-	return 0;
+	return rv;
 }
 
 int
@@ -111,71 +129,71 @@
 	register_t *retval)
 {
 	struct rsp_sysresp *resp;
-	struct rsp_copydata *copydata;
-	struct pollfd pfd;
-	size_t maplen;
-	void *mapaddr;
-	int gotresp;
+	void *rdata;
+	int rv;
 
-	DPRINTF(("rump_sp_syscall: executing syscall %d\n", sysnum));
+	DPRINTF(("rumpsp syscall_req: syscall %d with %p/%zu\n",
+	    sysnum, data, dlen));
 
-	send_syscall_req(&clispc, sysnum, data, dlen);
+	rv = syscall_req(&clispc, sysnum, data, dlen, &rdata);
+	if (rv)
+		return rv;
+
+	resp = rdata;
+	DPRINTF(("rumpsp syscall_resp: syscall %d error %d, rv: %d/%d\n",
+	    sysnum, rv, resp->rsys_retval[0], resp->rsys_retval[1]));
 
-	DPRINTF(("rump_sp_syscall: syscall %d request sent.  "
-	    "waiting for response\n", sysnum));
+	memcpy(retval, &resp->rsys_retval, sizeof(resp->rsys_retval));
+	rv = resp->rsys_error;
+	free(rdata);
 
-	pfd.fd = clispc.spc_fd;
-	pfd.events = POLLIN;
-
-	gotresp = 0;
-	while (!gotresp) {
-		while (readframe(&clispc) < 1)
-			poll(&pfd, 1, INFTIM);
-
-		switch (clispc.spc_hdr.rsp_type) {
-		case RUMPSP_COPYIN_REQ:
-			/*LINTED*/
-			copydata = (struct rsp_copydata *)clispc.spc_buf;
-			DPRINTF(("rump_sp_syscall: copyin request: %p/%zu\n",
-			    copydata->rcp_addr, copydata->rcp_len));
-			send_copyin_resp(&clispc, clispc.spc_hdr.rsp_reqno,
-			    copydata->rcp_addr, copydata->rcp_len);
-			clispc.spc_off = 0;
-			break;
-		case RUMPSP_COPYOUT_REQ:
-			/*LINTED*/
-			copydata = (struct rsp_copydata *)clispc.spc_buf;
-			DPRINTF(("rump_sp_syscall: copyout request: %p/%zu\n",
-			    copydata->rcp_addr, copydata->rcp_len));
-			/*LINTED*/
-			memcpy(copydata->rcp_addr, copydata->rcp_data,
-			    copydata->rcp_len);
-			clispc.spc_off = 0;
-			break;
-		case RUMPSP_ANONMMAP_REQ:
-			/*LINTED*/
-			maplen = *(size_t *)clispc.spc_buf;
-			mapaddr = mmap(NULL, maplen, PROT_READ|PROT_WRITE,
-			    MAP_ANON, -1, 0);
-			if (mapaddr == MAP_FAILED)
-				mapaddr = NULL;
-			send_anonmmap_resp(&clispc,
-			    clispc.spc_hdr.rsp_reqno, mapaddr);
-			clispc.spc_off = 0;
-			break;
-		case RUMPSP_SYSCALL_RESP:
-			DPRINTF(("rump_sp_syscall: got response \n"));
-			gotresp = 1;
-			break;
-		}
-	}
+	return rv;
+}
 
-	/*LINTED*/
-	resp = (struct rsp_sysresp *)clispc.spc_buf;
-	memcpy(retval, &resp->rsys_retval, sizeof(resp->rsys_retval));
-	clispc.spc_off = 0;
+static void
+handlereq(struct spclient *spc)
+{
+	struct rsp_copydata *copydata;
+	void *mapaddr;
+	size_t maplen;
+
+	switch (spc->spc_hdr.rsp_type) {
+	case RUMPSP_COPYIN:
+		/*LINTED*/
+		copydata = (struct rsp_copydata *)spc->spc_buf;
+		DPRINTF(("rump_sp handlereq: copyin request: %p/%zu\n",
+		    copydata->rcp_addr, copydata->rcp_len));
+		send_copyin_resp(spc, spc->spc_hdr.rsp_reqno,
+		    copydata->rcp_addr, copydata->rcp_len);
+		break;
+	case RUMPSP_COPYOUT:
+		/*LINTED*/
+		copydata = (struct rsp_copydata *)spc->spc_buf;
+		DPRINTF(("rump_sp handlereq: copyout request: %p/%zu\n",
+		    copydata->rcp_addr, copydata->rcp_len));
+		/*LINTED*/
+		memcpy(copydata->rcp_addr, copydata->rcp_data,
+		    copydata->rcp_len);
+		break;
+	case RUMPSP_ANONMMAP:
+		/*LINTED*/
+		maplen = *(size_t *)spc->spc_buf;
+		mapaddr = mmap(NULL, maplen, PROT_READ|PROT_WRITE,
+		    MAP_ANON, -1, 0);
+		if (mapaddr == MAP_FAILED)
+			mapaddr = NULL;
+		DPRINTF(("rump_sp handlereq: anonmmap: %p\n", mapaddr));
+		send_anonmmap_resp(spc, spc->spc_hdr.rsp_reqno, mapaddr);
+		break;
+	default:
+		printf("PANIC: INVALID TYPE\n");
+		abort();
+		break;
+	}
 
-	return resp->rsys_error;
+	free(spc->spc_buf);
+	spc->spc_off = 0;
+	spc->spc_buf = NULL;
 }
 
 int
@@ -213,6 +231,9 @@
 		return -1;
 	}
 	clispc.spc_fd = s;
+	TAILQ_INIT(&clispc.spc_respwait);
+	pthread_mutex_init(&clispc.spc_mtx, NULL);
+	pthread_cond_init(&clispc.spc_cv, NULL);
 
 	return 0;
 }

Index: src/lib/librumpuser/rumpuser_sp.c
diff -u src/lib/librumpuser/rumpuser_sp.c:1.6 src/lib/librumpuser/rumpuser_sp.c:1.7
--- src/lib/librumpuser/rumpuser_sp.c:1.6	Wed Nov 17 17:36:14 2010
+++ src/lib/librumpuser/rumpuser_sp.c	Fri Nov 19 15:25:49 2010
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpuser_sp.c,v 1.6 2010/11/17 17:36:14 pooka Exp $	*/
+/*      $NetBSD: rumpuser_sp.c,v 1.7 2010/11/19 15:25:49 pooka Exp $	*/
 
 /*
  * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
@@ -38,7 +38,7 @@
  */
 
 #include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.6 2010/11/17 17:36:14 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.7 2010/11/19 15:25:49 pooka Exp $");
 
 #include <sys/types.h>
 #include <sys/mman.h>
@@ -68,7 +68,6 @@
 static struct pollfd pfdlist[MAXCLI];
 static struct spclient spclist[MAXCLI];
 static unsigned int nfds, maxidx;
-static uint64_t nextreq;
 static pthread_key_t spclient_tls;
 
 static struct rumpuser_sp_ops spops;
@@ -132,45 +131,77 @@
 	return rv;
 }
 
+static uint64_t
+nextreq(struct spclient *spc)
+{
+	uint64_t nw;
+
+	pthread_mutex_lock(&spc->spc_mtx);
+	nw = spc->spc_nextreq++;
+	pthread_mutex_unlock(&spc->spc_mtx);
+
+	return nw;
+}
+
 static int
 send_syscall_resp(struct spclient *spc, uint64_t reqno, int error,
-	register_t retval[2])
+	register_t *retval)
 {
 	struct rsp_hdr rhdr;
 	struct rsp_sysresp sysresp;
+	int rv;
 
 	rhdr.rsp_len = sizeof(rhdr) + sizeof(sysresp);
 	rhdr.rsp_reqno = reqno;
-	rhdr.rsp_type = RUMPSP_SYSCALL_RESP;
+	rhdr.rsp_class = RUMPSP_RESP;
+	rhdr.rsp_type = RUMPSP_SYSCALL;
 	rhdr.rsp_sysnum = 0;
 
 	sysresp.rsys_error = error;
-	memcpy(sysresp.rsys_retval, retval, sizeof(retval));
+	memcpy(sysresp.rsys_retval, retval, sizeof(sysresp.rsys_retval));
 
-	dosend(spc, &rhdr, sizeof(rhdr));
-	dosend(spc, &sysresp, sizeof(sysresp));
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, &sysresp, sizeof(sysresp));
+	sendunlock(spc);
 
-	return 0;
+	return rv;
 }
 
 static int
-send_copyin_req(struct spclient *spc, const void *remaddr, size_t dlen)
+copyin_req(struct spclient *spc, const void *remaddr, size_t dlen, void **resp)
 {
 	struct rsp_hdr rhdr;
 	struct rsp_copydata copydata;
+	struct respwait rw;
+	int rv;
+
+	DPRINTF(("copyin_req: %zu bytes from %p\n", dlen, remaddr));
 
 	rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata);
-	rhdr.rsp_reqno = nextreq++;
-	rhdr.rsp_type = RUMPSP_COPYIN_REQ;
+	rhdr.rsp_class = RUMPSP_REQ;
+	rhdr.rsp_type = RUMPSP_COPYIN;
 	rhdr.rsp_sysnum = 0;
 
 	copydata.rcp_addr = __UNCONST(remaddr);
 	copydata.rcp_len = dlen;
 
-	dosend(spc, &rhdr, sizeof(rhdr));
-	dosend(spc, &copydata, sizeof(copydata));
+	putwait(spc, &rw, &rhdr);
+
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, &copydata, sizeof(copydata));
+	sendunlock(spc);
+	if (rv)
+		return rv; /* XXX: unputwait */
+
+	rv = waitresp(spc, &rw);
+
+	DPRINTF(("copyin: response %d\n", rv));
+
+	*resp = rw.rw_data;
+	return rv;
 
-	return 0;
 }
 
 static int
@@ -179,36 +210,57 @@
 {
 	struct rsp_hdr rhdr;
 	struct rsp_copydata copydata;
+	int rv;
+
+	DPRINTF(("copyout_req (async): %zu bytes to %p\n", dlen, remaddr));
 
 	rhdr.rsp_len = sizeof(rhdr) + sizeof(copydata) + dlen;
-	rhdr.rsp_reqno = nextreq++;
-	rhdr.rsp_type = RUMPSP_COPYOUT_REQ;
+	rhdr.rsp_reqno = nextreq(spc);
+	rhdr.rsp_class = RUMPSP_REQ;
+	rhdr.rsp_type = RUMPSP_COPYOUT;
 	rhdr.rsp_sysnum = 0;
 
 	copydata.rcp_addr = __UNCONST(remaddr);
 	copydata.rcp_len = dlen;
 
-	dosend(spc, &rhdr, sizeof(rhdr));
-	dosend(spc, &copydata, sizeof(copydata));
-	dosend(spc, data, dlen);
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, &copydata, sizeof(copydata));
+	rv = dosend(spc, data, dlen);
+	sendunlock(spc);
 
-	return 0;
+	return rv;
 }
 
 static int
-send_anonmmap_req(struct spclient *spc, size_t howmuch)
+anonmmap_req(struct spclient *spc, size_t howmuch, void **resp)
 {
 	struct rsp_hdr rhdr;
+	struct respwait rw;
+	int rv;
+
+	DPRINTF(("anonmmap_req: %zu bytes\n", howmuch));
 
 	rhdr.rsp_len = sizeof(rhdr) + sizeof(howmuch);
-	rhdr.rsp_reqno = nextreq++;
-	rhdr.rsp_type = RUMPSP_ANONMMAP_REQ;
+	rhdr.rsp_class = RUMPSP_REQ;
+	rhdr.rsp_type = RUMPSP_ANONMMAP;
 	rhdr.rsp_sysnum = 0;
 
-	dosend(spc, &rhdr, sizeof(rhdr));
-	dosend(spc, &howmuch, sizeof(howmuch));
+	putwait(spc, &rw, &rhdr);
 
-	return 0;
+	sendlock(spc);
+	rv = dosend(spc, &rhdr, sizeof(rhdr));
+	rv = dosend(spc, &howmuch, sizeof(howmuch));
+	sendunlock(spc);
+	if (rv)
+		return rv; /* XXX: unputwait */
+
+	rv = waitresp(spc, &rw);
+	*resp = rw.rw_data;
+
+	DPRINTF(("anonmmap: mapped at %p\n", **(void ***)resp));
+
+	return rv;
 }
 
 static void
@@ -222,6 +274,8 @@
 	lwproc_switch(spc->spc_lwp);
 	lwproc_release();
 
+	pthread_mutex_destroy(&spc->spc_mtx);
+	pthread_cond_destroy(&spc->spc_cv);
 	free(spc->spc_buf);
 	memset(spc, 0, sizeof(*spc));
 	close(fd);
@@ -289,6 +343,12 @@
 	pfdlist[i].fd = newfd;
 	spclist[i].spc_fd = newfd;
 	spclist[i].spc_lwp = lwproc_curlwp();
+	spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
+
+	TAILQ_INIT(&spclist[i].spc_respwait);
+	pthread_mutex_init(&spclist[i].spc_mtx, NULL);
+	pthread_cond_init(&spclist[i].spc_cv, NULL);
+
 	if (maxidx < i)
 		maxidx = i;
 
@@ -303,7 +363,7 @@
 static void
 serv_handlesyscall(struct spclient *spc, struct rsp_hdr *rhdr, uint8_t *data)
 {
-	register_t retval[2];
+	register_t retval[2] = {0, 0};
 	int rv, sysnum;
 
 	sysnum = (int)rhdr->rsp_sysnum;
@@ -317,36 +377,41 @@
 	pthread_setspecific(spclient_tls, NULL);
 	free(data);
 
-	DPRINTF(("rump_sp: got return value %d\n", rv));
+	DPRINTF(("rump_sp: got return value %d & %d/%d\n",
+	    rv, retval[0], retval[1]));
 
 	send_syscall_resp(spc, rhdr->rsp_reqno, rv, retval);
 }
 
+struct sysbouncearg {
+	struct spclient *sba_spc;
+	struct rsp_hdr sba_hdr;
+	uint8_t *sba_data;
+};
+static void *
+serv_syscallbouncer(void *arg)
+{
+	struct sysbouncearg *barg = arg;
+
+	serv_handlesyscall(barg->sba_spc, &barg->sba_hdr, barg->sba_data);
+	free(arg);
+	return NULL;
+}
+
 int
 rumpuser_sp_copyin(const void *uaddr, void *kaddr, size_t len)
 {
 	struct spclient *spc;
-	struct pollfd pfd;
+	void *rdata;
 
 	spc = pthread_getspecific(spclient_tls);
 	if (!spc)
 		return EFAULT;
 
-	send_copyin_req(spc, uaddr, len);
+	copyin_req(spc, uaddr, len, &rdata);
 
-	pfd.fd = spc->spc_fd;
-	pfd.events = POLLIN;
-	do {
-		poll(&pfd, 1, INFTIM);
-	} while (readframe(spc) < 1);
-
-	if (spc->spc_hdr.rsp_type != RUMPSP_COPYIN_RESP) {
-		abort();
-	}
-
-	memcpy(kaddr, spc->spc_buf, len);
-	free(spc->spc_buf);
-	spc->spc_off = 0;
+	memcpy(kaddr, rdata, len);
+	free(rdata);
 
 	return 0;
 }
@@ -362,8 +427,8 @@
 		return EFAULT;
 	}
 
-	send_copyout_req(spc, uaddr, kaddr, dlen);
-
+	if (send_copyout_req(spc, uaddr, kaddr, dlen) != 0)
+		return EFAULT;
 	return 0;
 }
 
@@ -371,31 +436,23 @@
 rumpuser_sp_anonmmap(size_t howmuch, void **addr)
 {
 	struct spclient *spc;
-	struct pollfd pfd;
-	void *resp;
+	void *resp, *rdata;
+	int rv;
 
 	spc = pthread_getspecific(spclient_tls);
 	if (!spc)
 		return EFAULT;
 
-	send_anonmmap_req(spc, howmuch);
-
-	pfd.fd = spc->spc_fd;
-	pfd.events = POLLIN;
-	do {
-		poll(&pfd, 1, INFTIM);
-	} while (readframe(spc) < 1);
-
-	if (spc->spc_hdr.rsp_type != RUMPSP_ANONMMAP_RESP) {
-		abort();
-	}
+	rv = anonmmap_req(spc, howmuch, &rdata);
+	if (rv)
+		return rv;
 
-	/*LINTED*/
-	resp = *(void **)spc->spc_buf;
-	spc->spc_off = 0;
+	resp = *(void **)rdata;
+	free(rdata);
 
-	if (resp == NULL)
+	if (resp == NULL) {
 		return ENOMEM;
+	}
 
 	*addr = resp;
 	return 0;
@@ -412,6 +469,38 @@
 	connecthook_fn sps_connhook;
 };
 
+static void
+handlereq(struct spclient *spc)
+{
+	struct sysbouncearg *sba;
+	pthread_attr_t pattr;
+	pthread_t pt;
+	int rv;
+
+	/* XXX: check that it's a syscall */
+
+	sba = malloc(sizeof(*sba));
+	if (sba == NULL) {
+		/* panic */
+		abort();
+	}
+
+	sba->sba_spc = spc;
+	sba->sba_hdr = spc->spc_hdr;
+	sba->sba_data = spc->spc_buf;
+
+	spc->spc_buf = NULL;
+	spc->spc_off = 0;
+
+	pthread_attr_init(&pattr);
+	pthread_attr_setdetachstate(&pattr, 1);
+
+	if ((rv = pthread_create(&pt, &pattr, serv_syscallbouncer, sba)) != 0) {
+		/* panic */
+		abort();
+	}
+}
+
 static void *
 spserver(void *arg)
 {
@@ -465,10 +554,18 @@
 					serv_handledisco(idx);
 					break;
 				default:
-					spc->spc_off = 0;
-					serv_handlesyscall(spc,
-					    &spc->spc_hdr, spc->spc_buf);
-					spc->spc_buf = NULL;
+					switch (spc->spc_hdr.rsp_class) {
+					case RUMPSP_RESP:
+						kickwaiter(spc);
+						break;
+					case RUMPSP_REQ:
+						handlereq(spc);
+						break;
+					default:
+						printf("PANIC\n");
+						abort();
+						break;
+					}
 					break;
 				}
 			} else {

Index: src/lib/librumpuser/sp_common.c
diff -u src/lib/librumpuser/sp_common.c:1.3 src/lib/librumpuser/sp_common.c:1.4
--- src/lib/librumpuser/sp_common.c:1.3	Wed Nov 10 16:12:15 2010
+++ src/lib/librumpuser/sp_common.c	Fri Nov 19 15:25:49 2010
@@ -1,4 +1,4 @@
-/*      $NetBSD: sp_common.c,v 1.3 2010/11/10 16:12:15 pooka Exp $	*/
+/*      $NetBSD: sp_common.c,v 1.4 2010/11/19 15:25:49 pooka Exp $	*/
 
 /*
  * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
@@ -33,6 +33,7 @@
 
 #include <sys/types.h>
 #include <sys/mman.h>
+#include <sys/queue.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 
@@ -44,6 +45,7 @@
 #include <errno.h>
 #include <fcntl.h>
 #include <poll.h>
+#include <pthread.h>
 #include <stdarg.h>
 #include <stdio.h>
 #include <stdlib.h>
@@ -70,17 +72,14 @@
  * Bah, I hate writing on-off-wire conversions in C
  */
 
-enum {
-	RUMPSP_SYSCALL_REQ,	RUMPSP_SYSCALL_RESP,
-	RUMPSP_COPYIN_REQ,	RUMPSP_COPYIN_RESP,
-	RUMPSP_COPYOUT_REQ,	/* no copyout resp */
-	RUMPSP_ANONMMAP_REQ,	RUMPSP_ANONMMAP_RESP
-};
+enum { RUMPSP_REQ, RUMPSP_RESP };
+enum { RUMPSP_SYSCALL, RUMPSP_COPYIN, RUMPSP_COPYOUT, RUMPSP_ANONMMAP };
 
 struct rsp_hdr {
 	uint64_t rsp_len;
 	uint64_t rsp_reqno;
-	uint32_t rsp_type;
+	uint16_t rsp_class;
+	uint16_t rsp_type;
 	/*
 	 * We want this structure 64bit-aligned for typecast fun,
 	 * so might as well use the following for something.
@@ -106,6 +105,15 @@
 	register_t rsys_retval[2];
 };
 
+struct respwait {
+	uint64_t rw_reqno;
+	void *rw_data;
+	size_t rw_dlen;
+
+	pthread_cond_t rw_cv;
+
+	TAILQ_ENTRY(respwait) rw_entries;
+};
 
 struct spclient {
 	int spc_fd;
@@ -116,18 +124,47 @@
 	uint8_t *spc_buf;
 	size_t spc_off;
 
-#if 0
-	/* outgoing */
-	int spc_obusy;
-	pthread_mutex_t spc_omtx;
+	pthread_mutex_t spc_mtx;
 	pthread_cond_t spc_cv;
-#endif
+
+	uint64_t spc_nextreq;
+	int spc_ostatus, spc_istatus;
+
+	TAILQ_HEAD(, respwait) spc_respwait;
 };
+#define SPCSTATUS_FREE 0
+#define SPCSTATUS_BUSY 1
+#define SPCSTATUS_WANTED 2
 
 typedef int (*addrparse_fn)(const char *, struct sockaddr **, int);
 typedef int (*connecthook_fn)(int);
 
-static uint64_t nextreq;
+static int readframe(struct spclient *);
+static void handlereq(struct spclient *);
+
+static void
+sendlock(struct spclient *spc)
+{
+
+	pthread_mutex_lock(&spc->spc_mtx);
+	while (spc->spc_ostatus != SPCSTATUS_FREE) {
+		spc->spc_ostatus = SPCSTATUS_WANTED;
+		pthread_cond_wait(&spc->spc_cv, &spc->spc_mtx);
+	}
+	spc->spc_ostatus = SPCSTATUS_BUSY;
+	pthread_mutex_unlock(&spc->spc_mtx);
+}
+
+static void
+sendunlock(struct spclient *spc)
+{
+
+	pthread_mutex_lock(&spc->spc_mtx);
+	if (spc->spc_ostatus == SPCSTATUS_WANTED)
+		pthread_cond_broadcast(&spc->spc_cv);
+	spc->spc_ostatus = SPCSTATUS_FREE;
+	pthread_mutex_unlock(&spc->spc_mtx);
+}
 
 static int
 dosend(struct spclient *spc, const void *data, size_t dlen)
@@ -163,6 +200,108 @@
 	return 0;
 }
 
+static void
+putwait(struct spclient *spc, struct respwait *rw, struct rsp_hdr *rhdr)
+{
+
+	rw->rw_data = NULL;
+	rw->rw_dlen = 0;
+	pthread_cond_init(&rw->rw_cv, NULL);
+
+	pthread_mutex_lock(&spc->spc_mtx);
+	rw->rw_reqno = rhdr->rsp_reqno = spc->spc_nextreq++;
+	TAILQ_INSERT_TAIL(&spc->spc_respwait, rw, rw_entries);
+	pthread_mutex_unlock(&spc->spc_mtx);
+}
+
+static void
+kickwaiter(struct spclient *spc)
+{
+	struct respwait *rw;
+
+	pthread_mutex_lock(&spc->spc_mtx);
+	TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries) {
+		if (rw->rw_reqno == spc->spc_hdr.rsp_reqno)
+			break;
+	}
+	if (rw == NULL) {
+		printf("PANIC: no waiter\n");
+		pthread_mutex_unlock(&spc->spc_mtx);
+		return;
+	}
+	rw->rw_data = spc->spc_buf;
+	TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
+	pthread_cond_signal(&rw->rw_cv);
+	pthread_mutex_unlock(&spc->spc_mtx);
+
+	spc->spc_buf = NULL;
+	spc->spc_off = 0;
+}
+
+static void
+kickall(struct spclient *spc)
+{
+	struct respwait *rw;
+
+	/* DIAGASSERT(mutex_owned(spc_lock)) */
+	TAILQ_FOREACH(rw, &spc->spc_respwait, rw_entries)
+		pthread_cond_signal(&rw->rw_cv);
+}
+
+static int
+waitresp(struct spclient *spc, struct respwait *rw)
+{
+	struct pollfd pfd;
+	int rv = 0;
+
+	pthread_mutex_lock(&spc->spc_mtx);
+	while (rw->rw_data == NULL) {
+		/* are we free to receive? */
+		if (spc->spc_istatus == SPCSTATUS_FREE) {
+			int gotresp;
+
+			spc->spc_istatus = SPCSTATUS_BUSY;
+			pthread_mutex_unlock(&spc->spc_mtx);
+
+			pfd.fd = spc->spc_fd;
+			pfd.events = POLLIN;
+
+			for (gotresp = 0; !gotresp; ) {
+				while (readframe(spc) < 1)
+					poll(&pfd, 1, INFTIM);
+
+				switch (spc->spc_hdr.rsp_class) {
+				case RUMPSP_RESP:
+					kickwaiter(spc);
+					gotresp = spc->spc_hdr.rsp_reqno ==
+					    rw->rw_reqno;
+					break;
+				case RUMPSP_REQ:
+					handlereq(spc);
+					break;
+				default:
+					/* panic */
+					break;
+				}
+			}
+			pthread_mutex_lock(&spc->spc_mtx);
+			if (spc->spc_istatus == SPCSTATUS_WANTED)
+				kickall(spc);
+			spc->spc_istatus = SPCSTATUS_FREE;
+			pthread_mutex_unlock(&spc->spc_mtx);
+		} else {
+			spc->spc_istatus = SPCSTATUS_WANTED;
+			pthread_cond_wait(&rw->rw_cv, &spc->spc_mtx);
+		}
+	}
+
+	TAILQ_REMOVE(&spc->spc_respwait, rw, rw_entries);
+	pthread_mutex_unlock(&spc->spc_mtx);
+
+	pthread_cond_destroy(&rw->rw_cv);
+	return rv;
+}
+
 static int
 readframe(struct spclient *spc)
 {

Reply via email to