This is basically the same patch as before, but with the Client addition
and subtraction code moved from redir_read and redir_closefid to
connopen and connclose.
Signed-off-by: Hugh Greenberg <[EMAIL PROTECTED]>
Index: xget.c
===================================================================
--- xget.c (revision 704)
+++ xget.c (working copy)
@@ -47,7 +47,6 @@
Spfileops redir_ops = {
.read = redir_read,
- .closefid = redir_closefid,
};
Spfileops avail_ops = {
@@ -81,7 +80,7 @@
int numconnects = 0;
int numtransfers = 0;
int maxlevels = 4;
-int maxconnections = 20;
+int maxconnections = 15;
int maxretries = 15; /* give at least 15 attempts by default */
int retrytime = 10; /* wait default of 20 seconds before trying again */
int singlefile = 0; /* Flag that indicates whether we are serving a single file */
@@ -92,6 +91,7 @@
u64 qpath = 1;
File *files;
Client *clients;
+Server *servers;
time_t starttime;
time_t endtime;
time_t servicetime;
@@ -125,7 +125,8 @@
void removeworker(File *f, Worker *worker) {
Worker *cur;
Client *client;
- Usedworker * uw;
+ Usedworker *uw, *uw2;
+ File *f2;
if(!worker)
return;
@@ -141,13 +142,27 @@
return; /* Not found in queue, already removed */
for (client = clients; client != NULL; client = client->next) {
- for (uw = client->workersused; uw != NULL; uw = uw->next) {
- if (uw->worker == worker)
- uw->worker = NULL;
+ for (uw = client->workersused; uw != NULL; ) {
+ uw2 = uw->next;
+ if (uw->worker == worker) {
+ if (uw->next)
+ uw->next->prev = uw->prev;
+ if (uw->prev)
+ uw->prev->next = uw->next;
+ if (uw == client->workersused)
+ client->workersused = uw->next;
+
+ if (uw->worker->server &&
+ uw->worker->server->conns > 0)
+ uw->worker->server->conns--;
+
+ free(uw);
+ }
+ uw = uw2;
}
}
- debug(Dbgsrvfn, "worker %p is done\n", worker, worker->saddress);
+ debug(Dbgsrvfn, "worker %p is done\n", worker);
if (worker == f->firstworker)
f->firstworker = worker->next;
@@ -167,7 +182,27 @@
worker->prev->next = worker->next;
f->numworkers--;
- free(worker->saddress);
+ //remove old servers that are no longer serving from the list
+
+ if (worker->server && worker->server->conns == 0) {
+ if (worker->server->next)
+ worker->server->next->prev = worker->server->prev;
+ if (worker->server->prev)
+ worker->server->prev->next = worker->server->next;
+ if (worker->server == servers)
+ servers = worker->server->next;
+ for (f2 = files; f2 != NULL; f2 = f2->next) {
+ for(cur = f2->firstworker; cur; cur = cur->next) {
+ if (cur == worker)
+ continue;
+ else if (cur->server == worker->server)
+ cur->server = NULL;
+ }
+ }
+
+ free(worker->server);
+ }
+
free(worker);
}
@@ -231,20 +266,15 @@
ticksig = 1;
if (servicetime && time(NULL) > servicetime) {
for(f = files; f != NULL; f = f->next) {
- // if (f->datafid)
- // spc_close(f->datafid);
- if (f->redirfid)
- spc_close(f->redirfid);
-
if (f->availfid)
spc_close(f->availfid);
}
-
+
if (masterfs) {
spc_umount(masterfs);
masterfs = NULL;
}
-
+
free(path);
exit(0);
}
@@ -372,11 +402,10 @@
f->datafid = NULL;
}
- if (f->fs && f->fs != masterfs) {
+ if (f->fs && f->fs != masterfs)
spc_umount(f->fs);
- f->fs = masterfs;
- }
-
+
+ f->fs = masterfs;
if (!changeperms) {
f->finished = 2;
goto done;
@@ -473,17 +502,20 @@
if (numconnects == 0)
servicetime = endtime;
else {
- n = (endtime - starttime);
- servicetime = endtime + n * 2;
+ if (n < 5)
+ n = 5;
+
+ n = (endtime - starttime);
+ servicetime = endtime + n * 2;
}
}
}
-
+
else {
if (numconnects == 0)
servicetime = time(NULL);
}
-
+
return 0;
}
@@ -731,7 +763,7 @@
dir_destroy(Spfile *dir) {
File *f, *f2;
Spfid *fid, **pool;
- Worker *w, *wt;
+ Worker *w;
Spfile *file;
Spfilefid *ffid;
Spconn *conns;
@@ -750,13 +782,9 @@
else if (f->prev)
f->prev->next = f->next;
- for (w = f->firstworker; w != NULL; ) {
- wt = w->next;
- free(w->saddress);
- free(w);
- w = wt;
- }
-
+ for (w = f->firstworker; w != NULL; )
+ removeworker(f, w);
+
for (req = f->reqs; req != NULL; ) {
req2 = req->next;
free(req);
@@ -1163,6 +1191,10 @@
if (!file)
goto error;
+ file->fs = NULL;
+ file->datafid = NULL;
+ file->datafd = NULL;
+ file->availfid = NULL;
/* check if we should go somewhere else for the file */
sprintf(buf, "%s/redir", nname);
redirfid = spc_open(masterfs, buf, Oread);
@@ -1174,6 +1206,7 @@
goto error;
buf[n] = '\0';
+ spc_close(redirfid);
redirto = sp_malloc(strlen(buf) + 1);
strcpy(redirto, buf);
if (!strcmp(buf, "me")) {
@@ -1192,11 +1225,6 @@
if (!datafid)
goto error;
- file->fs = redirfs;
- file->datafid = datafid;
- file->redirfid = redirfid;
- file->crc = crc;
- file->datafd = spcfd_add(file->datafid, netreadcb, file, 0);
sprintf(buf, "%s/avail", nname);
availfid = spc_open(masterfs, buf, Owrite);
if (!availfid)
@@ -1208,6 +1236,10 @@
goto error;
}
+ file->fs = redirfs;
+ file->datafid = datafid;
+ file->crc = crc;
+ file->datafd = spcfd_add(file->datafid, netreadcb, file, 0);
file->availfid = availfid;
free(buf);
free(redirto);
@@ -1538,6 +1570,7 @@
files = NULL;
clients = NULL;
+ servers = NULL;
starttime = time(NULL);
masterfs = NULL;
favail = NULL;
@@ -1758,8 +1791,6 @@
spc_close(f->datafid);
if (f->availfid)
spc_close(f->availfid);
- if (f->redirfid)
- spc_close(f->redirfid);
if (f->fs && f->fs != masterfs)
spc_umount(f->fs);
@@ -1778,15 +1809,76 @@
static void
connopen(Spconn *conn)
{
+ Client *c;
+ int n;
+
numconnects++;
debug(Dbgsrvfn, "Client %s connects %d\n", conn->address, numconnects);
+ n = strlen(conn->address) + 1;
+ c = sp_malloc(sizeof(Client) + n + sizeof(int) * (maxlevels - 1));
+ if (!c)
+ return;
+
+
+ c->caddress = (char *) c + sizeof(Client);
+ if (!c->caddress) {
+ free(c);
+ return;
+ }
+
+ memcpy(c->caddress, conn->address, n);
+ c->clevels = (int *) ((char *) c->caddress + n);
+ if (!c->clevels) {
+ free(c);
+ return;
+ }
+
+ memset(c->clevels, 0, sizeof(int) * (maxlevels - 1));
+ c->workersused = NULL;
+ c->prev = NULL;
+ c->next = clients;
+ if (clients)
+ clients->prev = c;
+
+ clients = c;
}
static void
connclose(Spconn *conn)
{
+ Client *c;
+ Usedworker *uw, *uw2;
+
numconnects--;
- debug(Dbgsrvfn, "Client %s disconnects %d\n", conn->address, numconnects);
+ debug(Dbgsrvfn, "Client %s disconnects %d\n", conn->address, numconnects);
+
+ for (c = clients; c != NULL; c = c->next) {
+ if (!strcmp(c->caddress, conn->address))
+ break;
+ }
+
+ if (!c)
+ return;
+
+ for (uw = c->workersused; uw != NULL; ) {
+ if (uw->worker && uw->worker->server &&
+ uw->worker->server->conns > 0)
+ uw->worker->server->conns--;
+
+ uw2 = uw->next;
+ free(uw);
+ uw = uw2;
+ }
+
+ if (c->next)
+ c->next->prev = c->prev;
+ if (c->prev)
+ c->prev->next = c->next;
+ if (c == clients)
+ clients = c->next;
+
+ debug(Dbgsrvfn, "Removing client with address: %s\n", c->caddress);
+ free(c);
}
static int
@@ -1817,35 +1909,9 @@
}
}
- if (!c) {
- c = sp_malloc(sizeof(Client));
- if (!c)
- return -1;
-
- c->caddress = sp_malloc(strlen(fid->fid->conn->address) + 1);
- if (!c->caddress) {
- free(c);
- return -1;
- }
+ if (!c)
+ return -1;
- strcpy(c->caddress, fid->fid->conn->address);
- c->clevels = sp_malloc(sizeof(int) * (maxlevels - 1));
- if (!c->clevels) {
- free(c->caddress);
- free(c);
- return -1;
- }
-
- memset(c->clevels, 0, sizeof(int) * (maxlevels - 1));
- c->workersused = NULL;
- c->prev = NULL;
- c->next = clients;
- if (clients)
- clients->prev = c;
- clients = c;
- fid->aux = c;
- }
-
q = 1;
j = 0;
if (!f->nextworker) {
@@ -1859,20 +1925,20 @@
continue;
}
- if (!t && w->slevel - 1 < m && w->conns < maxconnections)
+ if (!t && w->slevel - 1 < m && w->server->conns < maxconnections)
t = w;
else {
if (t) {
- if (t->conns > w->conns &&
+ if (t->server->conns > w->server->conns &&
m == w->slevel - 1)
t = w;
} else {
- if (w->conns < maxconnections &&
+ if (w->server->conns < maxconnections &&
m == w->slevel - 1)
t = w;
}
- if (t && t->conns == 0)
+ if (t && t->server->conns == 0)
break;
}
@@ -1889,21 +1955,25 @@
}
if (t) {
- snprintf(buf, sizeof(buf), "%s", t->saddress);
+ snprintf(buf, sizeof(buf), "%s", t->server->saddress);
if (t == f->lastworker)
f->nextworker = f->firstworker;
else
f->nextworker = t->next;
c->clevels[t->slevel - 1]++;
- j = ++t->conns;
+ j = t->server->conns + 1;
q = t->slevel;
uw = sp_malloc(sizeof(Usedworker));
if (!uw)
return -1;
uw->worker = t;
+ uw->prev = NULL;
uw->next = c->workersused;
+ if (c->workersused)
+ c->workersused->prev = uw;
+
c->workersused = uw;
}
else {
@@ -1923,39 +1993,6 @@
return count;
}
-static void
-redir_closefid(Spfilefid *fid)
-{
- Client *c;
- Usedworker *uw, *uw2;
-
- c = fid->aux;
-
- if (!c)
- return;
-
- for (uw = c->workersused; uw != NULL; ) {
- if (uw->worker && uw->worker->conns > 0)
- uw->worker->conns--;
-
- uw2 = uw->next;
- free(uw);
- uw = uw2;
- }
-
- if (c->next)
- c->next->prev = c->prev;
- if (c->prev)
- c->prev->next = c->next;
- if (c == clients)
- clients = c->next;
-
- debug(Dbgsrvfn, "Removing client with address: %s\n", c->caddress);
- free(c->caddress);
- free(c->clevels);
- free(c);
-}
-
static int
data_read(Spfilefid *fid, u64 offset, u32 count, u8 *buf, Spreq *r)
{
@@ -2003,11 +2040,14 @@
char *s, *p, *address;
Worker *worker, *wc;
File *f;
+ Server *server;
f = fid->file->aux;
if (offset || !f)
return 0;
+ server = NULL;
+ address = NULL;
s = strchr((const char *)data, ' ');
if (!s) {
sp_werror("invalid format for avail file", EINVAL);
@@ -2029,8 +2069,9 @@
address = sp_malloc(strlen(s) + 1);
strcpy(address, s);
for (wc = f->firstworker; wc != NULL; wc = wc->next) {
- if (!strcmp(wc->saddress, address)) {
+ if (!strcmp(wc->server->saddress, address)) {
slevel = wc->slevel + 1;
+ wc->server->conns++;
break;
}
}
@@ -2064,28 +2105,50 @@
strncpy(address, req->conn->address, n);
snprintf(address+n, 7, "!%d", port);
for (wc = f->firstworker; wc != NULL; wc = wc->next) {
- if (!strcmp(wc->saddress, address)) {
+ if (!strcmp(wc->server->saddress, address)) {
debug(Dbgsrvfn, "Found a duplicate worker, Address: %s."
" Not adding to worker list.\n", req->conn->address);
- free(address);
- return -1;
+ goto error;
}
}
worker = sp_malloc(sizeof(*worker));
- if (!worker) {
- free(address);
- return -1;
+ if (!worker)
+ goto error;
+
+ for (server = servers; server != NULL; server = server->next) {
+ if (!strcmp(server->saddress, address))
+ break;
}
+
+ if (!server) {
+ n = strlen(address) + 1;
+ server = sp_malloc(sizeof(*server) + n);
+ if (!server)
+ goto error;
- worker->saddress = address;
- //Check to make sure this isn't a duplicate worker
+ server->saddress = (char *) server + sizeof(*server);
+ if (!server->saddress)
+ goto error;
+
+ memcpy(server->saddress, address, n);
+ server->conns = 0;
+ server->next = servers;
+ server->prev = NULL;
+ if (servers)
+ servers->prev = server;
+
+ servers = server;
+ }
+
+ free(address);
+ //Check to make sure this isn't a duplicate worker
worker->slevel = slevel;
- worker->conns = 0;
+ worker->server = server;
worker->next = NULL;
worker->prev = f->lastworker;
- debug(Dbgsrvfn, "new worker with address %s and with level: %d for file: %s\n",
- worker->saddress, worker->slevel, f->nname);
+ debug(Dbgsrvfn, "new worker: %p with address %s and with level: %d for file: %s\n",
+ worker, worker->server->saddress, worker->slevel, f->nname);
if (f->lastworker)
f->lastworker->next = worker;
@@ -2103,6 +2166,14 @@
f->numworkers, f->nname);
return count;
+
+error:
+ if (address)
+ free(address);
+ if (server)
+ free(server);
+
+ return -1;
}
static void
Index: xget.h
===================================================================
--- xget.h (revision 704)
+++ xget.h (working copy)
@@ -19,6 +19,7 @@
};
typedef struct File File;
+typedef struct Server Server;
typedef struct Worker Worker;
typedef struct Usedworker Usedworker;
typedef struct Client Client;
@@ -37,7 +38,6 @@
Spcfsys*fs;
Spcfid* datafid; /* used while reading the file */
Spcfid* availfid;
- Spcfid* redirfid;
Spcfd* datafd;
int numworkers;
@@ -53,17 +53,24 @@
int retries;
};
+struct Server {
+ char *saddress;
+ int conns;
+ Server *next;
+ Server *prev;
+};
+
struct Worker {
- char* saddress;
Worker* prev;
Worker* next;
+ Server* server;
int slevel;
- int conns;
};
struct Usedworker {
Worker *worker;
Usedworker *next;
+ Usedworker *prev;
};
struct Client {
@@ -88,7 +95,6 @@
static void fsinit(void);
static int data_read(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
static int redir_read(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
-static void redir_closefid(Spfilefid *fid);
static int avail_write(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
static int avail_wstat(Spfile* file, Spstat* stat);
static void avail_closefid(Spfilefid *fid);