Module Name:    src
Committed By:   pooka
Date:           Wed Nov 24 11:40:24 UTC 2010

Modified Files:
        src/lib/librumpuser: rumpuser_sp.c

Log Message:
Sneeze some locking into connect/disconnect.


To generate a diff of this commit:
cvs rdiff -u -r1.10 -r1.11 src/lib/librumpuser/rumpuser_sp.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/lib/librumpuser/rumpuser_sp.c
diff -u src/lib/librumpuser/rumpuser_sp.c:1.10 src/lib/librumpuser/rumpuser_sp.c:1.11
--- src/lib/librumpuser/rumpuser_sp.c:1.10	Mon Nov 22 20:42:19 2010
+++ src/lib/librumpuser/rumpuser_sp.c	Wed Nov 24 11:40:24 2010
@@ -1,4 +1,4 @@
-/*      $NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $	*/
+/*      $NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $	*/
 
 /*
  * Copyright (c) 2010 Antti Kantee.  All Rights Reserved.
@@ -38,9 +38,10 @@
  */
 
 #include <sys/cdefs.h>
-__RCSID("$NetBSD: rumpuser_sp.c,v 1.10 2010/11/22 20:42:19 pooka Exp $");
+__RCSID("$NetBSD: rumpuser_sp.c,v 1.11 2010/11/24 11:40:24 pooka Exp $");
 
 #include <sys/types.h>
+#include <sys/atomic.h>
 #include <sys/mman.h>
 #include <sys/socket.h>
 
@@ -67,7 +68,7 @@
 
 static struct pollfd pfdlist[MAXCLI];
 static struct spclient spclist[MAXCLI];
-static unsigned int nfds, maxidx;
+static unsigned int disco;
 
 static struct rumpuser_sp_ops spops;
 
@@ -287,66 +288,81 @@
 }
 
 static void
-serv_handledisco(unsigned int idx)
+spcref(struct spclient *spc)
 {
-	struct spclient *spc = &spclist[idx];
-	int fd = spc->spc_fd;
 
-	DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
+	pthread_mutex_lock(&spc->spc_mtx);
+	spc->spc_refcnt++;
+	pthread_mutex_unlock(&spc->spc_mtx);
+}
+
+static void
+spcrelease(struct spclient *spc)
+{
+	int ref;
+
+	pthread_mutex_lock(&spc->spc_mtx);
+	ref = --spc->spc_refcnt;
+	pthread_mutex_unlock(&spc->spc_mtx);
+
+	if (ref > 0)
+		return;
+
+	_DIAGASSERT(TAILQ_EMPTY(&spclist[i].spc_respwait));
+	_DIAGASSERT(spc->spc_buf == NULL);
 
 	lwproc_switch(spc->spc_mainlwp);
 	lwproc_release();
+	spc->spc_mainlwp = NULL;
+
+	close(spc->spc_fd);
+	spc->spc_fd = -1;
+
+	spc->spc_pfd->fd = -1;
+	membar_producer();
+	atomic_inc_uint(&disco);
 
-	pthread_mutex_destroy(&spc->spc_mtx);
-	pthread_cond_destroy(&spc->spc_cv);
-	free(spc->spc_buf);
-	memset(spc, 0, sizeof(*spc));
-	close(fd);
-	pfdlist[idx].fd = -1;
-	nfds--;
-
-	if (idx == maxidx) {
-		while (idx--) {
-			if (pfdlist[idx].fd != -1) {
-				maxidx = idx;
-				break;
-			}
-			assert(idx != 0);
-		}
-		DPRINTF(("rump_sp: set maxidx to [%u]\n", maxidx));
-	}
 }
 
-static int
-serv_handleconn(int fd, connecthook_fn connhook)
+static void
+serv_handledisco(unsigned int idx)
+{
+	struct spclient *spc = &spclist[idx];
+
+	DPRINTF(("rump_sp: disconnecting [%u]\n", idx));
+
+	spcrelease(spc);
+}
+
+static unsigned
+serv_handleconn(int fd, connecthook_fn connhook, int busy)
 {
 	struct sockaddr_storage ss;
 	socklen_t sl = sizeof(ss);
-	int newfd, flags, error;
+	int newfd, flags;
 	unsigned i;
 
 	/*LINTED: cast ok */
 	newfd = accept(fd, (struct sockaddr *)&ss, &sl);
 	if (newfd == -1)
-		return errno;
-
-	/* XXX: should do some sort of handshake too */
+		return 0;
 
-	if (nfds == MAXCLI) {
+	if (busy) {
 		close(newfd); /* EBUSY */
-		return EBUSY;
+		return 0;
 	}
 
+	/* XXX: should do some sort of handshake too */
+
 	flags = fcntl(newfd, F_GETFL, 0);
 	if (fcntl(newfd, F_SETFL, flags | O_NONBLOCK) == -1) {
 		close(newfd);
-		return errno;
+		return 0;
 	}
-	flags = 1;
 
-	if ((error = connhook(newfd)) != 0) {
+	if (connhook(newfd) != 0) {
 		close(newfd);
-		return error;
+		return 0;
 	}
 
 	/* find empty slot the simple way */
@@ -355,33 +371,28 @@
 			break;
 	}
 
-	if ((error = lwproc_newproc(&spclist[i])) != 0) {
+	if (lwproc_newproc(&spclist[i]) != 0) {
 		close(newfd);
-		return error;
+		return 0;
 	}
 
 	assert(i < MAXCLI);
-	nfds++;
 
 	pfdlist[i].fd = newfd;
 	spclist[i].spc_fd = newfd;
 	spclist[i].spc_mainlwp = lwproc_curlwp();
 	spclist[i].spc_istatus = SPCSTATUS_BUSY; /* dedicated receiver */
 	spclist[i].spc_pid = lwproc_getpid();
+	spclist[i].spc_refcnt = 1;
 
 	TAILQ_INIT(&spclist[i].spc_respwait);
-	pthread_mutex_init(&spclist[i].spc_mtx, NULL);
-	pthread_cond_init(&spclist[i].spc_cv, NULL);
-
-	if (maxidx < i)
-		maxidx = i;
 
 	DPRINTF(("rump_sp: added new connection at idx %u, pid %d\n",
 	    i, lwproc_getpid()));
 
 	lwproc_switch(NULL);
 
-	return 0;
+	return i;
 }
 
 static void
@@ -503,6 +514,7 @@
 	pthread_attr_init(&pattr);
 	pthread_attr_setdetachstate(&pattr, 1);
 
+	spcref(spc);
 	if ((rv = pthread_create(&pt, &pattr, serv_syscallbouncer, sba)) != 0) {
 		/* panic */
 		abort();
@@ -513,13 +525,21 @@
 spserver(void *arg)
 {
 	struct spservarg *sarg = arg;
+	struct spclient *spc;
 	unsigned idx;
 	int seen;
 	int rv;
+	unsigned int nfds, maxidx;
 
-	for (idx = 1; idx < MAXCLI; idx++) {
+	for (idx = 0; idx < MAXCLI; idx++) {
 		pfdlist[idx].fd = -1;
 		pfdlist[idx].events = POLLIN;
+
+		spc = &spclist[idx];
+
+		spc->spc_pfd = &pfdlist[idx];
+		pthread_mutex_init(&spc->spc_mtx, NULL);
+		pthread_cond_init(&spc->spc_cv, NULL);
 	}
 	pfdlist[0].fd = sarg->sps_sock;
 	pfdlist[0].events = POLLIN;
@@ -529,6 +549,27 @@
 	DPRINTF(("rump_sp: server mainloop\n"));
 
 	for (;;) {
+		/* g/c hangarounds (eventually) */
+		if (disco) {
+			int discoed;
+
+			membar_consumer();
+			discoed = atomic_swap_uint(&disco, 0);
+			while (discoed--) {
+				nfds--;
+				idx = maxidx;
+				while (idx--) {
+					if (pfdlist[idx].fd != -1) {
+						maxidx = idx;
+						break;
+					}
+					assert(idx != 0);
+				}
+				DPRINTF(("rump_sp: set maxidx to [%u]\n",
+				    maxidx));
+			}
+		}
+
 		DPRINTF(("rump_sp: loop nfd %d\n", maxidx+1));
 		seen = 0;
 		rv = poll(pfdlist, maxidx+1, INFTIM);
@@ -552,7 +593,7 @@
 			DPRINTF(("rump_sp: activity at [%u] %d/%d\n",
 			    idx, seen, rv));
 			if (idx > 0) {
-				struct spclient *spc = &spclist[idx];
+				spc = &spclist[idx];
 
 				DPRINTF(("rump_sp: mainloop read [%u]\n", idx));
 				switch (readframe(spc)) {
@@ -576,10 +617,16 @@
 					}
 					break;
 				}
+
 			} else {
 				DPRINTF(("rump_sp: mainloop new connection\n"));
-				serv_handleconn(pfdlist[0].fd,
-				    sarg->sps_connhook);
+
+				idx = serv_handleconn(pfdlist[0].fd,
+				    sarg->sps_connhook, nfds == MAXCLI);
+				if (idx)
+					nfds++;
+				if (idx > maxidx)
+					maxidx = idx;
 			}
 		}
 	}

Reply via email to