This is basically the same patch as before, but with the Client addition
and subtraction code moved from redir_read and redir_closefid to
connopen and connclose.

Signed-off-by: Hugh Greenberg <[EMAIL PROTECTED]>


Index: xget.c
===================================================================
--- xget.c	(revision 704)
+++ xget.c	(working copy)
@@ -47,7 +47,6 @@
 
 Spfileops redir_ops = {
 	.read = redir_read,
-	.closefid = redir_closefid,
 };
 
 Spfileops avail_ops = {
@@ -81,7 +80,7 @@
 int numconnects = 0;
 int numtransfers = 0;
 int maxlevels = 4;
-int maxconnections = 20;
+int maxconnections = 15;
 int maxretries = 15; /* give at least 15 attempts by default */
 int retrytime = 10; /* wait default of 20 seconds before trying again */
 int singlefile = 0; /* Flag that indicates whether we are serving a single file */
@@ -92,6 +91,7 @@
 u64 qpath = 1;
 File *files;
 Client *clients;
+Server *servers;
 time_t starttime;
 time_t endtime;
 time_t servicetime;
@@ -125,7 +125,8 @@
 void removeworker(File *f, Worker *worker) {
 	Worker *cur;
 	Client *client;
-	Usedworker * uw;
+	Usedworker *uw, *uw2;
+	File *f2;
 
 	if(!worker)
 		return;
@@ -141,13 +142,27 @@
 		return; /* Not found in queue, already removed */
 
 	for (client = clients; client != NULL; client = client->next) {
-		for (uw = client->workersused; uw != NULL; uw = uw->next) {
-			if (uw->worker == worker) 
-				uw->worker = NULL;
+		for (uw = client->workersused; uw != NULL; ) {
+			uw2 = uw->next;
+			if (uw->worker == worker) {
+				if (uw->next)
+					uw->next->prev = uw->prev;
+				if (uw->prev)
+					uw->prev->next = uw->next;
+				if (uw == client->workersused)
+					client->workersused = uw->next;
+
+				if (uw->worker->server && 
+				    uw->worker->server->conns > 0)
+					uw->worker->server->conns--;
+				
+				free(uw);
+			}
+			uw = uw2;
 		}
 	}
 
-	debug(Dbgsrvfn, "worker %p is done\n", worker, worker->saddress);
+	debug(Dbgsrvfn, "worker %p is done\n", worker);
 	if (worker == f->firstworker)
 		f->firstworker = worker->next;
 
@@ -167,7 +182,27 @@
 		worker->prev->next = worker->next;
 
 	f->numworkers--;
-	free(worker->saddress);
+	//remove old servers that are no longer serving from the list
+
+	if (worker->server &&  worker->server->conns == 0) {
+		if (worker->server->next)
+			worker->server->next->prev = worker->server->prev;
+		if (worker->server->prev)
+			worker->server->prev->next = worker->server->next;
+		if (worker->server == servers)
+			servers = worker->server->next;
+		for (f2 = files; f2 != NULL; f2 = f2->next) {
+			for(cur = f2->firstworker; cur; cur = cur->next) {
+				if (cur == worker)
+					continue;
+				else if (cur->server == worker->server)
+					cur->server = NULL;
+			}
+		}
+		
+		free(worker->server);
+	}
+	
 	free(worker);
 }
 
@@ -231,20 +266,15 @@
 	ticksig = 1;
 	if (servicetime && time(NULL) > servicetime) {
 		for(f = files; f != NULL; f = f->next) {
-			//	if (f->datafid)
-			//	spc_close(f->datafid);
-			if (f->redirfid)
-				spc_close(f->redirfid);
-
 			if (f->availfid)
 				spc_close(f->availfid);
 		}
-		
+	
 		if (masterfs) {
 			spc_umount(masterfs);
 			masterfs = NULL;
 		}
-
+		
 		free(path);
 		exit(0);
 	}
@@ -372,11 +402,10 @@
 		f->datafid = NULL;
 	}
 
-	if (f->fs && f->fs != masterfs) {
+	if (f->fs && f->fs != masterfs)
 		spc_umount(f->fs);
-		f->fs = masterfs;
-	}
-
+	
+	f->fs = masterfs;
 	if (!changeperms) {
 		f->finished = 2;
 		goto done;
@@ -473,17 +502,20 @@
 			if (numconnects == 0)
 				servicetime = endtime;
 			else {
-			n = (endtime - starttime);
-			servicetime = endtime + n * 2;
+				if (n < 5)
+					n = 5;
+				
+				n = (endtime - starttime);
+				servicetime = endtime + n * 2;
 			}
 		}
 	}
-
+	
 	else {
 		if (numconnects == 0)
 			servicetime = time(NULL);
 	}
-
+	
 	return 0;
 }
 
@@ -731,7 +763,7 @@
 dir_destroy(Spfile *dir) {
 	File *f, *f2;
 	Spfid *fid, **pool;
-	Worker *w, *wt;
+	Worker *w;
 	Spfile *file;
 	Spfilefid *ffid;
 	Spconn *conns;
@@ -750,13 +782,9 @@
 		else if (f->prev)
 			f->prev->next = f->next;
 		
-		for (w = f->firstworker; w != NULL; ) {
-			wt = w->next;
-			free(w->saddress);
-			free(w);
-			w = wt;
-		}
-		
+		for (w = f->firstworker; w != NULL; )
+			removeworker(f, w);
+	       		
 		for (req = f->reqs; req != NULL; ) {
 			req2 = req->next;
 			free(req);
@@ -1163,6 +1191,10 @@
 	if (!file)
 		goto error;
 
+	file->fs = NULL;
+	file->datafid = NULL;
+	file->datafd = NULL;
+	file->availfid = NULL;
 	/* check if we should go somewhere else for the file */
 	sprintf(buf, "%s/redir", nname);
 	redirfid = spc_open(masterfs, buf, Oread);
@@ -1174,6 +1206,7 @@
 		goto error;
 
 	buf[n] = '\0';
+	spc_close(redirfid);
 	redirto = sp_malloc(strlen(buf) + 1);
 	strcpy(redirto, buf);
 	if (!strcmp(buf, "me")) {
@@ -1192,11 +1225,6 @@
 	if (!datafid)
 		goto error;
 
-	file->fs = redirfs;
-	file->datafid = datafid;
-	file->redirfid = redirfid;
-	file->crc = crc;
-	file->datafd = spcfd_add(file->datafid, netreadcb, file, 0);
 	sprintf(buf, "%s/avail", nname);
 	availfid = spc_open(masterfs, buf, Owrite);
 	if (!availfid)
@@ -1208,6 +1236,10 @@
 		goto error;
 	}
 
+	file->fs = redirfs;
+	file->datafid = datafid;
+	file->crc = crc;
+	file->datafd = spcfd_add(file->datafid, netreadcb, file, 0);
 	file->availfid = availfid;
 	free(buf);
 	free(redirto);
@@ -1538,6 +1570,7 @@
 
 	files = NULL;
 	clients = NULL;
+	servers = NULL;
 	starttime = time(NULL);
 	masterfs = NULL;
 	favail = NULL;
@@ -1758,8 +1791,6 @@
 			spc_close(f->datafid);
 		if (f->availfid)
 			spc_close(f->availfid);
-		if (f->redirfid)
-			spc_close(f->redirfid);
 
 		if (f->fs && f->fs != masterfs)
 			spc_umount(f->fs);
@@ -1778,15 +1809,76 @@
 static void
 connopen(Spconn *conn)
 {
+	Client *c;
+	int n;
+
 	numconnects++;
 	debug(Dbgsrvfn, "Client %s connects %d\n", conn->address, numconnects);
+	n = strlen(conn->address) + 1;
+	c = sp_malloc(sizeof(Client) + n + sizeof(int) * (maxlevels - 1));
+	if (!c)
+		return;
+
+
+	c->caddress = (char *) c + sizeof(Client);
+	if (!c->caddress) {
+		free(c);
+		return;
+	}
+
+	memcpy(c->caddress, conn->address, n);
+	c->clevels = (int *) ((char *) c->caddress + n);
+	if (!c->clevels) {
+		free(c);
+		return;
+	}
+
+	memset(c->clevels, 0, sizeof(int) * (maxlevels - 1));
+	c->workersused = NULL;
+	c->prev = NULL;
+	c->next = clients;
+	if (clients)
+		clients->prev = c;
+
+	clients = c;
 }
 
 static void
 connclose(Spconn *conn)
 {
+	Client *c;
+	Usedworker *uw, *uw2;
+	
 	numconnects--;
-	debug(Dbgsrvfn, "Client %s disconnects %d\n", conn->address, numconnects);
+	debug(Dbgsrvfn, "Client %s disconnects %d\n", conn->address, numconnects);	
+
+	for (c = clients; c != NULL; c = c->next) {
+		if (!strcmp(c->caddress, conn->address))
+			break;
+	}
+
+	if (!c)
+		return;
+
+	for (uw = c->workersused; uw != NULL; ) {
+		if (uw->worker && uw->worker->server &&
+		    uw->worker->server->conns > 0)
+			uw->worker->server->conns--;
+		
+		uw2 = uw->next;
+		free(uw);
+		uw = uw2;
+	}
+
+	if (c->next)
+		c->next->prev = c->prev;
+	if (c->prev)
+		c->prev->next = c->next;
+	if (c == clients)
+		clients = c->next;
+
+	debug(Dbgsrvfn, "Removing client with address: %s\n", c->caddress);
+	free(c);
 }
 
 static int
@@ -1817,35 +1909,9 @@
 		}
 	}
 
-	if (!c) {
-		c = sp_malloc(sizeof(Client));
-		if (!c)
-			return -1;
-		
-		c->caddress = sp_malloc(strlen(fid->fid->conn->address) + 1);
-		if (!c->caddress) {
-			free(c);
-			return -1;
-		}
+	if (!c) 
+		return -1;
 
-		strcpy(c->caddress, fid->fid->conn->address);
-		c->clevels = sp_malloc(sizeof(int) * (maxlevels - 1));
-		if (!c->clevels) {
-			free(c->caddress);
-			free(c);
-			return -1;
-		}
-
-		memset(c->clevels, 0, sizeof(int) * (maxlevels - 1));
-		c->workersused = NULL;
-		c->prev = NULL;
-		c->next = clients;
-		if (clients)
-			clients->prev = c;
-		clients = c;
-		fid->aux = c;
-	}
-	
 	q = 1;
 	j = 0;
 	if (!f->nextworker) {
@@ -1859,20 +1925,20 @@
 				continue;
 			}
 
-			if (!t && w->slevel - 1 < m && w->conns < maxconnections) 
+			if (!t && w->slevel - 1 < m && w->server->conns < maxconnections) 
 					t = w;
 			else {
 				if (t) {
-					if (t->conns > w->conns && 
+					if (t->server->conns > w->server->conns && 
 					    m == w->slevel - 1)
 						t = w;
 				} else {
-					if (w->conns < maxconnections &&
+					if (w->server->conns < maxconnections &&
 					    m == w->slevel - 1)
 						t = w;
 				}
 				
-				if (t && t->conns == 0)
+				if (t && t->server->conns == 0)
 					break;
 			}
 
@@ -1889,21 +1955,25 @@
 		}
 			
 		if (t) {
-			snprintf(buf, sizeof(buf), "%s", t->saddress);
+			snprintf(buf, sizeof(buf), "%s", t->server->saddress);
 			if (t == f->lastworker)
 				f->nextworker = f->firstworker;
 			else 
 				f->nextworker = t->next;
 			
 			c->clevels[t->slevel - 1]++;
-			j = ++t->conns;
+			j = t->server->conns + 1;
 			q = t->slevel;
 			uw = sp_malloc(sizeof(Usedworker));
 			if (!uw) 
 				return -1;
 
 			uw->worker = t;
+			uw->prev = NULL;
 			uw->next = c->workersused;
+			if (c->workersused)
+				c->workersused->prev = uw;
+
 			c->workersused = uw;
 		}
 		else {
@@ -1923,39 +1993,6 @@
 	return count;
 }
 
-static void
-redir_closefid(Spfilefid *fid)
-{
-	Client *c;
-	Usedworker *uw, *uw2;
-		
-	c = fid->aux;
-
-	if (!c)
-		return;
-
-	for (uw = c->workersused; uw != NULL; ) {
-		if (uw->worker && uw->worker->conns > 0)
-			uw->worker->conns--;
-		
-		uw2 = uw->next;
-		free(uw);
-		uw = uw2;
-	}
-
-	if (c->next)
-		c->next->prev = c->prev;
-	if (c->prev)
-		c->prev->next = c->next;
-	if (c == clients)
-		clients = c->next;
-
-	debug(Dbgsrvfn, "Removing client with address: %s\n", c->caddress);
-	free(c->caddress);
-	free(c->clevels);
-	free(c);
-}
-
 static int
 data_read(Spfilefid *fid, u64 offset, u32 count, u8 *buf, Spreq *r)
 {
@@ -2003,11 +2040,14 @@
 	char *s, *p, *address;
 	Worker *worker, *wc;
 	File *f;
+	Server *server;
 
 	f = fid->file->aux;
 	if (offset || !f)
 		return 0;
 
+	server = NULL;
+	address = NULL;
 	s = strchr((const char *)data, ' ');
 	if (!s) {
 		sp_werror("invalid format for avail file", EINVAL);
@@ -2029,8 +2069,9 @@
 		address = sp_malloc(strlen(s) + 1);
 		strcpy(address, s);
 		for (wc = f->firstworker; wc != NULL; wc = wc->next) {
-			if (!strcmp(wc->saddress, address)) {
+			if (!strcmp(wc->server->saddress, address)) {
 				slevel = wc->slevel + 1;
+				wc->server->conns++;
 				break;
 			}
 		}
@@ -2064,28 +2105,50 @@
 	strncpy(address, req->conn->address, n);
 	snprintf(address+n, 7, "!%d", port);
 	for (wc = f->firstworker; wc != NULL; wc = wc->next) {
-		if (!strcmp(wc->saddress, address)) {
+		if (!strcmp(wc->server->saddress, address)) {
 			debug(Dbgsrvfn, "Found a duplicate worker, Address: %s." 
 			      "  Not adding to worker list.\n", req->conn->address);
-			free(address);
-			return -1;
+			goto error;
 		}
 	}
 
 	worker = sp_malloc(sizeof(*worker));
-	if (!worker) {
-		free(address);
-		return -1;
+	if (!worker) 
+		goto error;
+	
+	for (server = servers; server != NULL; server = server->next) {
+		if (!strcmp(server->saddress, address))
+			break;
 	}
+	
+	if (!server) {
+		n = strlen(address) + 1;
+		server = sp_malloc(sizeof(*server) + n);
+		if (!server) 
+			goto error;
 
-	worker->saddress = address;
-	//Check to make sure this isn't a duplicate worker
+		server->saddress = (char *) server + sizeof(*server);
+		if (!server->saddress)
+			goto error;
+
+		memcpy(server->saddress, address, n);
+		server->conns = 0;
+		server->next = servers;
+		server->prev = NULL;
+		if (servers)
+			servers->prev = server;
+
+		servers = server;
+	}
+	
+	free(address);
+       	//Check to make sure this isn't a duplicate worker
 	worker->slevel = slevel;
-	worker->conns = 0;
+	worker->server = server;
 	worker->next = NULL;
 	worker->prev = f->lastworker;
-	debug(Dbgsrvfn, "new worker with address %s and with level: %d for file: %s\n",
-	      worker->saddress, worker->slevel, f->nname);
+	debug(Dbgsrvfn, "new worker: %p with address %s and with level: %d for file: %s\n",
+	      worker, worker->server->saddress, worker->slevel, f->nname);
 
 	if (f->lastworker)
 		f->lastworker->next = worker;
@@ -2103,6 +2166,14 @@
 	      f->numworkers, f->nname);
 
 	return count;
+
+error:
+	if (address)
+		free(address);
+	if (server)
+		free(server);
+
+	return -1;
 }
 
 static void
Index: xget.h
===================================================================
--- xget.h	(revision 704)
+++ xget.h	(working copy)
@@ -19,6 +19,7 @@
 };
 
 typedef struct File File;
+typedef struct Server Server;
 typedef struct Worker Worker;
 typedef struct Usedworker Usedworker;
 typedef struct Client Client;
@@ -37,7 +38,6 @@
 	Spcfsys*fs;
 	Spcfid*	datafid;	/* used while reading the file */
 	Spcfid*	availfid;
-	Spcfid* redirfid;
 	Spcfd*	datafd;
 
 	int	numworkers;
@@ -53,17 +53,24 @@
 	int     retries;
 };
 
+struct Server {
+	char *saddress;
+	int  conns;
+	Server *next;
+	Server *prev;
+};
+
 struct Worker {
-	char*   saddress;
 	Worker*	prev;
 	Worker*	next;
+	Server* server;
 	int     slevel;
-	int     conns;
 };
 
 struct Usedworker {
 	Worker *worker;
 	Usedworker *next;
+	Usedworker *prev;
 };
 
 struct Client {
@@ -88,7 +95,6 @@
 static void	fsinit(void);
 static int	data_read(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
 static int	redir_read(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
-static void	redir_closefid(Spfilefid *fid);
 static int 	avail_write(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
 static int	avail_wstat(Spfile* file, Spstat* stat);
 static void	avail_closefid(Spfilefid *fid);

Reply via email to