This patch does the following:
Reduces the variation in the completion of file transfers and
performance improvements. This is done by limiting the number of levels
created in the tree and by managing and limiting the number clients each
worker is serving to. This more evenly balances the load on the workers
and prevents a large number of edges from forming from a leaf node to
the master. My tests show very little variation in results and a
slightly lower mean compared to the old version.
Additional performance improvements for downloading multiple files at
one time. This is done by having the client randomize the list of files
served from a server. That way, different clients start to download
different files and there is less contention for the same file.
Fixes a bug where xget would crash if the filesystem was full.
Provides an option, -o, on the server and client that disables xget
awareness of the current user and restores the old behavior. This is
for Greg Kurtzer.
Provides an option, -s, that disables permission and ownership
preservation of the client side. The -o option will imply this option,
but not vice-versa.
Removes the maximum number of workers option. It doesn't seem to be
needed anymore due to the performance enhancements mentioned above.
A client will no longer wait after finishing the download if there are
no clients connected to it.
Also included is a man page patch.
Signed-off-by: Hugh Greenberg <[EMAIL PROTECTED]>
Index: man1/xbootfs.1
===================================================================
--- man1/xbootfs.1 (revision 695)
+++ man1/xbootfs.1 (working copy)
@@ -1,87 +0,0 @@
-." Text automatically generated by txt2man
-.TH xbootfs 1 "February 25, 2008" "" ""
-.SH NAME
-\fBxbootfs \- Transfers files in a scalable way
-.SH SYNOPSIS
-.nf
-.fam C
-\fBxbootfs\fP [\fB-D\fP \fIdebuglevel\fP] [\fB-p\fP \fIport\fP] [\fB-w\fP \fImaxworkers\fP] \fB-f\fP <\fIfile\fP>
-\fBxbootfs\fP [\fB-D\fP \fIdebuglevel\fP] [\fB-p\fP \fIport\fP] [\fB-o\fP \fIfilename\fP] [\fB-s\fP \fIservicetime\fP] <\fB-n\fP netaddr>
-.fam T
-.fi
-.SH DESCRIPTION
-\fBxbootfs\fP transfers files in a manner that is very scalable. It is inteaded
-to be used for transfering boot images to all of the nodes in a cluster at once. However, it can be used for transfering any file to a large number
-of machines at once.
-
-
-\fBxbootfs\fP uses the 9P file system which uses a client/server model to distribute the file.
-
-
-There is always one main master server that must be started before any of the clients.
-\fBxbootfs\fP turns
-some clients into servers for a period of time in order to distribute the load of the master server.
-.SH OPTIONS
-.TP
-.B
-Master Server Options
-.TP
-.B
-\fB-D\fP \fIlevel\fP
-Turns on debugging messages. Different information will be printed depending of the value of \fIlevel\fP. If \fIlevel\fP is 1, xbootfs will print out function call debugging information. If \fIlevel\fP is 2, xbootfs will print out 9P debugging information when acting as a server. If \fIlevel\fP is 4, xbootfs will printout 9P debugging information when acting as a client.
-.TP
-.B
-\fB-p\fP \fIport\fP
-The \fIport\fP to listen on
-.TP
-.B
-\fB-w\fP \fImaxworkers\fP
-The maximum number of clients that will become servers when necessary
-.TP
-.B
-\fB-f\fP \fIfile\fP
-The \fIfile\fP to serve to all of the clients
-.RE
-.PP
-
-.RS
-Client Options
-.TP
-.B
-\fB-D\fP
-Turn on 9P debugging messages
-.TP
-.B
-\fB-p\fP \fIport\fP
-The \fIport\fP the master server is listening on
-.TP
-.B
-\fB-o\fP \fIfilename\fP
-The \fIfilename\fP to save the downloaded \fIfile\fP to. The default is /tmp/xboot.
-.TP
-.B
-\fB-s\fP \fIservicetime\fP
-The maximum amount of time in seconds this client can act a server
-.TP
-.B
-\fB-n\fP netaddr
-The ip address or hostname of the master server
-.SH EXAMPLE
-This command will start the master server.
-.PP
-.nf
-.fam C
- $ xbootfs \-f /tmp/boot.gz
-
-.fam T
-.fi
-This command will start a client with a \fIservicetime\fP of 30 seconds and save the downloaded file to /tmp/dboot.gz.
-This command assumes the master server has an ip address of 10.0.0.1.
-.PP
-.nf
-.fam C
- $ xbootfs \-o /tmp/dboot.gz -s 30 -n 10.0.0.1
-.fam T
-.fi
-.SH BUGS
-Plenty.
Index: man1/xget.1
===================================================================
--- man1/xget.1 (revision 695)
+++ man1/xget.1 (working copy)
@@ -6,8 +6,8 @@
.SH SYNOPSIS
.nf
.fam C
-\fBxget\fP [\fB-D\fP \fIdebuglevel\fP] [\fB-p\fP \fIport\fP] [\fB-w\fP \fImaxworkers\fP] <src>
-\fBxget\fP [\fB-D\fP \fIdebuglevel\fP] [\fB-p\fP \fIport\fP] [\fB-s\fP \fIservicetime\fP] <\fB-n\fP netaddr> <\fIremote_file\fP | .> [\fIremote_file\fP] <dest | .>
+\fBxget\fP [\fB-D\fP \fIdebuglevel\fP] [\fB-p\fP \fIport\fP] [\fB-o\fP] <src>
+\fBxget\fP [\fB-D\fP \fIdebuglevel\fP] [\fB-p\fP \fIport\fP] [\fB-s\fP] [\fB-o\fP] <\fB-n\fP netaddr> <\fIremote_file\fP | .> [\fIremote_file\fP] <dest | .>
.fam T
.fi
.SH DESCRIPTION
@@ -33,8 +33,8 @@
\fB-p\fP \fIport\fP
The \fIport\fP to listen on
.PP
-\fB-w\fP \fImaxworkers\fP
-The maximum number of clients that will become servers when necessary
+\fB-o\fP
+Prevents xget from determining the current user. xget will not preserve ownership information with this option enabled.
.PP
src
The file or directory to serve to all of the clients
@@ -48,9 +48,13 @@
\fB-p\fP \fIport\fP
The \fIport\fP the master server is listening on
.PP
-\fB-s\fP \fIservicetime\fP
-The maximum amount of time in seconds this client can act a server
+\fB-s\fP
+Disable permission preservation
.PP
+\fB-o\fP
+Prevents xget from determining the current user. This also disables
+permission preservation.
+.PP
\fB-n\fP netaddr
The ip address or hostname of the master server
.PP
Index: man1/xget.t2m
===================================================================
--- man1/xget.t2m (revision 695)
+++ man1/xget.t2m (working copy)
@@ -2,8 +2,8 @@
xget - Transfers files in a scalable way
SYNOPSIS
- xget [-D debuglevel] [-p port] [-w maxworkers] <src>
- xget [-D debuglevel] [-p port] [-s servicetime] <-n netaddr> <remote_file | .> [remote_file] <dest | .>
+ xget [-D debuglevel] [-p port] [-o] <src>
+ xget [-D debuglevel] [-p port] [-s] [-o] <-n netaddr> <remote_file | .> [remote_file] <dest | .>
DESCRIPTION
xget transfers files in a manner that is very scalable. It is inteaded to be used for
transfering boot images to all of the nodes in a cluster at once. However, it can be
@@ -28,8 +28,9 @@
-p port
The port to listen on
- -w maxworkers
- The maximum number of clients that will become servers when necessary
+ -o
+ Prevents xget from determining the current user. xget will not
+ preserve ownership information with this option enabled.
src
The file or directory to serve to all of the clients
@@ -41,9 +42,13 @@
-p port
The port the master server is listening on
- -s servicetime
- The maximum amount of time in seconds this client can act a server
+ -s
+ Disable permission preservation
+ -o
+ Prevents xget from determining the current user. This
+ also disables permission preservation.
+
-n netaddr
The ip address or hostname of the master server
Index: xget.c
===================================================================
--- xget.c (revision 695)
+++ xget.c (working copy)
@@ -47,6 +47,7 @@
Spfileops redir_ops = {
.read = redir_read,
+ .closefid = redir_closefid,
};
Spfileops avail_ops = {
@@ -79,14 +80,18 @@
int xget_ecode;
int numconnects = 0;
int numtransfers = 0;
-int maxworkers = 20;
+int maxlevels = 4;
+int maxconnections = 20;
int maxretries = 15; /* give at least 15 attempts by default */
int retrytime = 10; /* wait default of 20 seconds before trying again */
int singlefile = 0; /* Flag that indicates whether we are serving a single file */
+int changeperms = 1;
+int rootonly = 0;
char *path, *outname;
int ticksig;
u64 qpath = 1;
File *files;
+Client *clients;
time_t starttime;
time_t endtime;
time_t servicetime;
@@ -96,12 +101,13 @@
static int fullfilename(Spfile *file, char* fullname, int buflen);
static void netreadcb(Spcfd *fd, void *a);
+
static void
usage(char *name) {
- fprintf(stderr, "usage as the master server: %s [-D debuglevel] [-p port] %s",
- name, "[-w maxworkers] file|directory\n");
+ fprintf(stderr, "usage as the master server: %s [-D debuglevel] [-p port] [-o] %s",
+ name, " file|directory\n");
fprintf(stderr, "usage as a client : %s [-D debuglevel] [-p port] %s %s",
- name, "<-n netaddr> <src file | src dir | .> ",
+ name, "<-n netaddr> [-s] [-o] <src file | src dir | .> ",
"[src file | src dir] ... dest\n");
exit(1);
}
@@ -118,6 +124,8 @@
void removeworker(File *f, Worker *worker) {
Worker *cur;
+ Client *client;
+ Usedworker * uw;
if(!worker)
return;
@@ -132,15 +140,26 @@
if(!cur)
return; /* Not found in queue, already removed */
- debug(Dbgsrvfn, "worker %p is done\n", worker, worker->ip, worker->port);
+ for (client = clients; client != NULL; client = client->next) {
+ for (uw = client->workersused; uw != NULL; uw = uw->next) {
+ if (uw->worker == worker)
+ uw->worker = NULL;
+ }
+ }
+
+ debug(Dbgsrvfn, "worker %p is done\n", worker, worker->saddress);
if (worker == f->firstworker)
f->firstworker = worker->next;
if (worker == f->lastworker)
f->lastworker = worker->prev;
- if (worker == f->nextworker)
- f->nextworker = worker->next;
+ if (worker == f->nextworker) {
+ if (worker->next)
+ f->nextworker = worker->next;
+ else
+ f->nextworker = f->firstworker;
+ }
if (worker->next)
worker->next->prev = worker->prev;
@@ -148,7 +167,7 @@
worker->prev->next = worker->next;
f->numworkers--;
- free(worker->ip);
+ free(worker->saddress);
free(worker);
}
@@ -206,10 +225,29 @@
static void
sigalrm(int sig)
{
+ File *f;
+
alarm(1);
ticksig = 1;
- if (servicetime && time(NULL) > servicetime)
+ if (servicetime && time(NULL) > servicetime) {
+ for(f = files; f != NULL; f = f->next) {
+ // if (f->datafid)
+ // spc_close(f->datafid);
+ if (f->redirfid)
+ spc_close(f->redirfid);
+
+ if (f->availfid)
+ spc_close(f->availfid);
+ }
+
+ if (masterfs) {
+ spc_umount(masterfs);
+ masterfs = NULL;
+ }
+
+ free(path);
exit(0);
+ }
}
//Taken from xcpufs/ufs.c
@@ -294,12 +332,16 @@
if ((file_finalize(f, 1)) < 0)
goto error;
- if (f->datafd)
+ if (f->datafd) {
spcfd_remove(f->datafd);
+ f->datafd = NULL;
+ }
- f->datafd = NULL;
f->progress = time(NULL);
f->finished = 0;
+ if (offset == 0)
+ f->datalen = 0;
+
f->datafid = spc_open(masterfs, tname, Oread);
if (!f->datafid)
goto error;
@@ -322,9 +364,24 @@
char *ename;
int ecode;
- if (!f || !f->datasize)
+ if (!f)
return 0;
+ if (f->datafid) {
+ spc_close(f->datafid);
+ f->datafid = NULL;
+ }
+
+ if (f->fs && f->fs != masterfs) {
+ spc_umount(f->fs);
+ f->fs = masterfs;
+ }
+
+ if (!changeperms) {
+ f->finished = 2;
+ goto done;
+ }
+
if (!write) {
umode = np2umode(f->datafile->mode, NULL);
if ((chmod(f->lname, umode)) == -1) {
@@ -353,7 +410,8 @@
if (sp_haserror()) {
sp_rerror(&ename, &ecode);
debug(Dbgclntfn, "Could not change group ownership"
- " of file: %s. Error: %s\n", f->lname, ename);
+ " of file: %s to gid: %u. Error: %s\n", f->lname,
+ f->datafile->gid->gid, ename);
sp_werror(NULL, 0);
}
}
@@ -373,10 +431,11 @@
}
}
+done:
return 0;
}
-static void
+static int
tick(void)
{
int fdone, n;
@@ -386,9 +445,13 @@
fdone = 1;
for(f = files; f != NULL; f = f->next) {
// debug(Dbgfn, "tick\n");
- if (f->finished<0 && fileretry(f, 0)<0)
- return;
-
+ if (f->finished<0) {
+ debug(Dbgclntfn, "File: %s checksum did not match, retrying\n",
+ f->nname);
+ if (fileretry(f, 0) < 0)
+ return -1;
+ }
+
if (f->finished == 1)
file_finalize(f, 0);
@@ -396,22 +459,32 @@
continue;
fdone = 0;
- if ((time(NULL) - f->progress) > retrytime
- && fileretry(f, f->datalen) < 0)
- return;
+ if (time(NULL) - f->progress > retrytime) {
+ debug(Dbgclntfn, "File: %s timed out, retrying\n", f->nname);
+ if (fileretry(f, f->datalen) < 0)
+ return -1;
+ }
}
if (fdone) {
endtime = time(NULL);
debug(Dbgfn, "finished download for %d seconds\n",
(endtime - starttime));
-
- n = endtime - starttime;
- if (n < 5)
- n = 5;
- servicetime = endtime + n;
+ if (numconnects == 0)
+ servicetime = endtime;
+ else {
+ n = (endtime - starttime);
+ servicetime = endtime + n * 2;
+ }
}
}
+
+ else {
+ if (numconnects == 0)
+ servicetime = time(NULL);
+ }
+
+ return 0;
}
static void
@@ -679,7 +752,7 @@
for (w = f->firstworker; w != NULL; ) {
wt = w->next;
- free(w->ip);
+ free(w->saddress);
free(w);
w = wt;
}
@@ -789,7 +862,6 @@
f->finished = 0;
f->progress = time(NULL);
f->retries = 0;
-
f->dir = create_file(parent, nname, 0555 | Dmdir, (qp << 8), &dir_ops, usr, grp, f);
f->dir->mtime = f->dir->atime = mtime;
f->datafile = create_file(f->dir, "data", mode, (qp << 8) | Qdata, &data_ops, usr, grp, f);
@@ -825,6 +897,8 @@
f = NULL;
crc = 0;
ret = NULL;
+ usr = NULL;
+ grp = NULL;
if ((name = strrchr(filename, '/')))
name++;
else
@@ -836,8 +910,11 @@
}
npmode = umode2npmode(st.st_mode);
- usr = sp_unix_users->uid2user(sp_unix_users, st.st_uid);
- grp = sp_unix_users->gid2group(sp_unix_users, st.st_gid);
+ if (!rootonly) {
+ usr = sp_unix_users->uid2user(sp_unix_users, st.st_uid);
+ grp = sp_unix_users->gid2group(sp_unix_users, st.st_gid);
+ }
+
if (S_ISREG(st.st_mode)) {
if ((fd = open(filename, O_RDONLY)) == -1) {
sp_uerror(errno);
@@ -909,11 +986,12 @@
if (count < 0)
count = 0;
- if (f->datalen==f->datasize || count>4096) {
+ if (count > req->count)
+ count = req->count;
+
+ if (f->datalen == f->datasize || (count>4096 && req->offset + count <= f->datalen) ) {
/* if we haven't got the whole file and can send back
only small chunk, don't respond, wait for more */
- if (count > req->count)
- count = req->count;
buf = sp_malloc(count);
if ((fd = open(f->lname, O_RDONLY)) == -1) {
@@ -1044,14 +1122,15 @@
u32 npmode, Spuser *usr, Spgroup *grp)
{
int n, blen, crc;
- char *buf, *fname, *s;
- Spcfid *fid, *datafid, *crcfid, *availfid;
+ char *buf, *fname, *redirto;
+ Spcfid *datafid, *crcfid, *availfid, *redirfid;
Spcfsys *redirfs;
File *file;
datafid = NULL;
crcfid = NULL;
availfid = NULL;
+ redirfid = NULL;
redirfs = NULL;
blen = strlen(nname) + 16;
@@ -1060,39 +1139,7 @@
buf = sp_malloc(blen);
if (!buf)
return -1;
-
- /* check if we should go somewhere else for the file */
- sprintf(buf, "%s/redir", nname);
- fid = spc_open(masterfs, buf, Oread);
- if (!fid)
- goto error;
- n = spc_read(fid, (u8 *) buf, blen, 0);
- if (n < 0)
- goto error;
-
- buf[n] = '\0';
- spc_close(fid);
- fid = NULL;
-
- s = buf;
- if (strncmp(buf, "help ", 5) == 0)
- s = buf + 5;
-
- if (!strcmp(s, "me"))
- redirfs = masterfs;
- else {
- debug(Dbgclntfn, "redirected to %s\n", buf);
- redirfs = spc_netmount(s, user, port, NULL, NULL);
- if (!redirfs)
- redirfs = masterfs;
- }
-
- sprintf(buf, "%s/data", nname);
- datafid = spc_open(redirfs, buf, Oread);
- if (!datafid)
- goto error;
-
sprintf(buf, "%s/crc", nname);
crcfid = spc_open(masterfs, buf, Oread);
if (!crcfid)
@@ -1105,12 +1152,6 @@
buf[n] = '\0';
crc = strtoul(buf, NULL, 0);
spc_close(crcfid);
-
- sprintf(buf, "%s/avail", nname);
- availfid = spc_open(masterfs, buf, Owrite);
- if (!availfid)
- goto error;
-
fname = strrchr(nname, '/');
if (!fname)
fname = nname;
@@ -1122,23 +1163,54 @@
if (!file)
goto error;
+ /* check if we should go somewhere else for the file */
+ sprintf(buf, "%s/redir", nname);
+ redirfid = spc_open(masterfs, buf, Oread);
+ if (!redirfid)
+ goto error;
+
+ n = spc_read(redirfid, (u8 *) buf, blen, 0);
+ if (n < 0)
+ goto error;
+
+ buf[n] = '\0';
+ redirto = sp_malloc(strlen(buf) + 1);
+ strcpy(redirto, buf);
+ if (!strcmp(buf, "me")) {
+ redirfs = masterfs;
+ debug(Dbgclntfn, "Downloading file: %s from master\n", nname);
+ }
+ else {
+ debug(Dbgclntfn, "Redirected to %s for file: %s\n", buf, nname);
+ redirfs = spc_netmount(buf, user, port, NULL, NULL);
+ if (!redirfs)
+ redirfs = masterfs;
+ }
+
+ sprintf(buf, "%s/data", nname);
+ datafid = spc_open(redirfs, buf, Oread);
+ if (!datafid)
+ goto error;
+
file->fs = redirfs;
file->datafid = datafid;
- file->availfid = availfid;
+ file->redirfid = redirfid;
file->crc = crc;
+ file->datafd = spcfd_add(file->datafid, netreadcb, file, 0);
+ sprintf(buf, "%s/avail", nname);
+ availfid = spc_open(masterfs, buf, Owrite);
+ if (!availfid)
+ goto error;
- n = time(NULL) - starttime;
- if (n < 30)
- n = 30;
-
- snprintf(buf, blen, "%d %d", port, n);
- n = spc_write(file->availfid, (u8 *) buf, strlen(buf) + 1, 0);
+ snprintf(buf, blen, "%d %s", port, redirto);
+ n = spc_write(availfid, (u8 *) buf, strlen(buf) + 1, 0);
if (n < 0) {
goto error;
}
- file->datafd = spcfd_add(file->datafid, netreadcb, file, 0);
+ file->availfid = availfid;
free(buf);
+ free(redirto);
return 0;
error:
@@ -1151,9 +1223,11 @@
if (availfid)
spc_close(availfid);
- if (redirfs && redirfs != masterfs)
+ if (redirfs && redirfs != masterfs) {
spc_umount(redirfs);
-
+ file->fs = NULL;
+ }
+
free(buf);
return -1;
}
@@ -1162,13 +1236,15 @@
netdirread(Spfile *parent, char *lname, char *nname, u32 npmode, Spuser *usr,
Spgroup *grp)
{
- int ret, i, n;
+ int ret, i, n, r;
char *dname;
struct stat ustat;
Spfile *dir;
Spcfid *fid;
Spwstat *st;
-
+ char **fnames;
+ struct timeval tv;
+
fid = spc_open(masterfs, nname, Oread);
if (!fid)
return -1;
@@ -1212,18 +1288,43 @@
goto done;
}
+ gettimeofday(&tv, NULL);
+ srand(tv.tv_sec);
while ((n = spc_dirread(fid, &st)) > 0) {
+ fnames = (char **) sp_malloc(sizeof(char *) * n);
+ for (i=0; i < n; i++) {
+ fnames[i] = NULL;
+ }
+
for(i = 0; i < n; i++) {
if (dir == root && !strcmp(st[i].name, "log"))
continue;
- debug(Dbgclntfn, "Setting up file: %s\n", st[i].name);
- if (netread(dir, lname, nname, st[i].name) < 0) {
+
+ r = (int) (n * (rand() / (RAND_MAX + 1.0)));
+ while (fnames[r] != NULL) {
+ r =(int) (n * (rand() / (RAND_MAX + 1.0)));
+ }
+
+ fnames[r] = sp_malloc(strlen(st[i].name) + 1);
+ snprintf(fnames[r], strlen(st[i].name) + 1, "%s", st[i].name);
+ }
+
+ for (i=0; i < n; i++) {
+ if (fnames[i] == NULL)
+ continue;
+
+ debug(Dbgclntfn, "Setting up file: %s\n", fnames[i]);
+ if (netread(dir, lname, nname, fnames[i]) < 0) {
ret = -1;
goto done;
}
+
+ free(fnames[i]);
}
- free(st);
+
+ free(fnames);
+ free(st);
}
if (n < 0)
@@ -1291,16 +1392,21 @@
}
npmode = st->mode;
- usr = sp_unix_users->uname2user(sp_unix_users, st->uid);
- grp = sp_unix_users->gname2group(sp_unix_users, st->gid);
+ if (!rootonly) {
+ usr = sp_unix_users->uname2user(sp_unix_users, st->uid);
+ grp = sp_unix_users->gname2group(sp_unix_users, st->gid);
+ }
+
ret = netdirread(parent, lname, nname, npmode, usr, grp);
} else {
len = st->length;
mtime = st->mtime;
npmode = st->mode;
- usr = sp_unix_users->uname2user(sp_unix_users, st->uid);
- grp = sp_unix_users->gname2group(sp_unix_users, st->gid);
-
+ if (!rootonly) {
+ usr = sp_unix_users->uname2user(sp_unix_users, st->uid);
+ grp = sp_unix_users->gname2group(sp_unix_users, st->gid);
+ }
+
/* regular file, set it up and retry up to maxretries */
for (i=0; i < maxretries; i++) {
sp_werror(NULL, 0);
@@ -1428,15 +1534,18 @@
Spcfid *favail;
File *f, *f1;
struct stat st;
+ Spuserpool *upool;
+ files = NULL;
+ clients = NULL;
starttime = time(NULL);
masterfs = NULL;
favail = NULL;
servicetime = endtime = 0;
xget_ename = NULL;
xget_ecode = 0;
-
- while ((c = getopt(argc, argv, "D:p:n:w:r:")) != -1) {
+
+ while ((c = getopt(argc, argv, "D:p:n:w:r:so")) != -1) {
switch (c) {
case 'p':
port = strtol(optarg, &s, 10);
@@ -1451,24 +1560,38 @@
case 'n':
netaddress = optarg;
break;
- case 'w':
- maxworkers = strtol(optarg, &s, 10);
- if (*s != '\0')
- usage(argv[0]);
- break;
case 'r':
maxretries = strtol(optarg, &s, 10);
if (*s != '\0')
usage(argv[0]);
break;
+ case 's':
+ changeperms = 0;
+ break;
+ case 'o':
+ rootonly = 1;
+ changeperms = 0;
+ break;
default:
usage(argv[0]);
}
}
spc_chatty = debuglevel & Dbgclnt;
- user = sp_unix_users->uid2user(sp_unix_users, geteuid());
- group = user->dfltgroup;
+ if (rootonly) {
+ upool = sp_priv_userpool_create();
+ if (!upool)
+ goto error;
+
+ user = sp_priv_user_add(upool, "root", 0, NULL);
+ group = sp_priv_group_add(upool, "root", 0);
+ sp_priv_user_setdfltgroup(user, group);
+ } else {
+
+ user = sp_unix_users->uid2user(sp_unix_users, geteuid());
+ group = user->dfltgroup;
+ }
+
if (!user)
goto error;
if (!group)
@@ -1526,7 +1649,11 @@
srv->connclose = connclose;
srv->debuglevel = debuglevel & Dbgfs;
srv->dotu = 0;
- srv->upool = sp_unix_users;
+ if (rootonly)
+ srv->upool = upool;
+ else
+ srv->upool = sp_unix_users;
+
srv->flush = xflush;
sp_srv_start(srv);
debug(Dbgsrvfn, "listen on %d\n", port);
@@ -1604,25 +1731,15 @@
goto error;
if (ticksig) {
- tick();
+ if (tick() < 0)
+ goto error;
+
ticksig = 0;
}
sp_poll_once();
}
- for(f = files; f != NULL; f = f->next) {
- if (f->datafid)
- spc_close(f->datafid);
- if (f->availfid)
- spc_close(f->availfid);
- }
-
- if (masterfs)
- spc_umount(masterfs);
-
- masterfs = NULL;
- free(path);
return 0;
error:
@@ -1641,13 +1758,20 @@
spc_close(f->datafid);
if (f->availfid)
spc_close(f->availfid);
+ if (f->redirfid)
+ spc_close(f->redirfid);
+
+ if (f->fs && f->fs != masterfs)
+ spc_umount(f->fs);
+
f1 = f->next;
free(f);
f = f1;
}
-
+
if (masterfs)
spc_umount(masterfs);
+
return -1;
}
@@ -1668,10 +1792,12 @@
static int
redir_read(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req)
{
- int n;
- char buf[128], help[32];
+ int n, m, i, j, q;
+ char buf[128];
File *f;
- Worker *w;
+ Worker *w, *p, *t;
+ Client *c;
+ Usedworker *uw;
f = fid->file->aux;
@@ -1679,48 +1805,157 @@
return 0;
memset(buf, 0, sizeof(buf));
-
- /* take this opportunity to clean up the workers */
- w = f->firstworker;
- while (w && (w->until < now())) {
- debug(Dbgsrvfn, "Removing worker in redir_read for File: "
- "%s - ip: %s, port: %d, until: %ld\n", f->lname, w->ip,
- w->port, w->until);
- removeworker(f, w);
- w = w->next;
+ m = 0;
+ for (c = clients; c != NULL; c = c->next) {
+ if (!strcmp(c->caddress, fid->fid->conn->address)) {
+ for (i = 1; i < maxlevels - 1; i++) {
+ if (c->clevels[m] > c->clevels[i])
+ m = i;
+ }
+
+ break;
+ }
}
- if (f->numworkers < maxworkers) {
- snprintf(help, sizeof(help), "help ");
- debug(Dbgsrvfn, "Asking for help in redir_read for File: %s\n", f->lname);
+ if (!c) {
+ c = sp_malloc(sizeof(Client));
+ if (!c)
+ return -1;
+
+ c->caddress = sp_malloc(strlen(fid->fid->conn->address) + 1);
+ if (!c->caddress) {
+ free(c);
+ return -1;
+ }
+
+ strcpy(c->caddress, fid->fid->conn->address);
+ c->clevels = sp_malloc(sizeof(int) * (maxlevels - 1));
+ if (!c->clevels) {
+ free(c->caddress);
+ free(c);
+ return -1;
+ }
+
+ memset(c->clevels, 0, sizeof(int) * (maxlevels - 1));
+ c->workersused = NULL;
+ c->prev = NULL;
+ c->next = clients;
+ if (clients)
+ clients->prev = c;
+ clients = c;
+ fid->aux = c;
}
- else
- help[0] = 0;
-
+
+ q = 1;
+ j = 0;
if (!f->nextworker) {
- snprintf(buf, sizeof(buf), "%sme", help);
- debug(Dbgsrvfn, "Asking for help and download from master for file: %s\n",
- f->lname);
+ snprintf(buf, sizeof(buf), "me");
+ c->clevels[0]++;
} else {
- snprintf(buf, sizeof(buf), "%s%s!%d", help, f->nextworker->ip,
- f->nextworker->port);
- debug(Dbgsrvfn, "Asking for help and download from a different server for "
- "file: %s, from: %s %d\n", f->lname, f->nextworker->ip, f->nextworker->port);
- if (f->nextworker == f->lastworker)
- f->nextworker = f->firstworker;
- else
- f->nextworker = f->nextworker->next;
+ p = f->nextworker;
+ t = NULL;
+ for (w = f->nextworker; w != NULL; ) {
+ if (w->slevel >= maxlevels) {
+ continue;
+ }
+
+ if (!t && w->slevel - 1 < m && w->conns < maxconnections)
+ t = w;
+ else {
+ if (t) {
+ if (t->conns > w->conns &&
+ m == w->slevel - 1)
+ t = w;
+ } else {
+ if (w->conns < maxconnections &&
+ m == w->slevel - 1)
+ t = w;
+ }
+
+ if (t && t->conns == 0)
+ break;
+ }
+
+ if (!w->next && p == f->firstworker)
+ break;
+ else if (!w->next)
+ w = f->firstworker;
+ else if (w->next == p)
+ break;
+ else
+ w = w->next;
+
+ continue;
+ }
+
+ if (t) {
+ snprintf(buf, sizeof(buf), "%s", t->saddress);
+ if (t == f->lastworker)
+ f->nextworker = f->firstworker;
+ else
+ f->nextworker = t->next;
+
+ c->clevels[t->slevel - 1]++;
+ j = ++t->conns;
+ q = t->slevel;
+ uw = sp_malloc(sizeof(Usedworker));
+ if (!uw)
+ return -1;
+
+ uw->worker = t;
+ uw->next = c->workersused;
+ c->workersused = uw;
+ }
+ else {
+ snprintf(buf, sizeof(buf), "me");
+ c->clevels[0]++;
+ }
}
-
+
n = strlen(buf);
if (count > n)
count = n;
memcpy(data, buf, count);
- debug(Dbgsrvfn, "Client %s redirected to %s\n", fid->fid->conn->address, buf);
+ debug(Dbgsrvfn, "Client %s redirected to %s with conns: %d and level: %d"
+ " for file: %s\n", fid->fid->conn->address, buf, j, q, f->nname);
+
return count;
}
+static void
+redir_closefid(Spfilefid *fid)
+{
+ Client *c;
+ Usedworker *uw, *uw2;
+
+ c = fid->aux;
+
+ if (!c)
+ return;
+
+ for (uw = c->workersused; uw != NULL; ) {
+ if (uw->worker && uw->worker->conns > 0)
+ uw->worker->conns--;
+
+ uw2 = uw->next;
+ free(uw);
+ uw = uw2;
+ }
+
+ if (c->next)
+ c->next->prev = c->prev;
+ if (c->prev)
+ c->prev->next = c->next;
+ if (c == clients)
+ clients = c->next;
+
+ debug(Dbgsrvfn, "Removing client with address: %s\n", c->caddress);
+ free(c->caddress);
+ free(c->clevels);
+ free(c);
+}
+
static int
data_read(Spfilefid *fid, u64 offset, u32 count, u8 *buf, Spreq *r)
{
@@ -1764,9 +1999,9 @@
static int
avail_write(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req)
{
- int n, port, wtime;
- char *p, *s;
- Worker *worker, *dwc;
+ int n, port, slevel;
+ char *s, *p, *address;
+ Worker *worker, *wc;
File *f;
f = fid->file->aux;
@@ -1775,60 +2010,83 @@
s = strchr((const char *)data, ' ');
if (!s) {
- sp_werror("invalid data written to avail", EINVAL);
+ sp_werror("invalid format for avail file", EINVAL);
return -1;
}
+
*s++ = '\0';
-
port = strtoul((const char *)data, &p, 0);
if (*data == '\0' || *p != '\0') {
sp_werror("invalid port number", EINVAL);
return -1;
}
+
+ slevel = 0;
+ if (!strcmp(s, "me")) {
+ slevel = 1;
+ }
+ else {
+ address = sp_malloc(strlen(s) + 1);
+ strcpy(address, s);
+ for (wc = f->firstworker; wc != NULL; wc = wc->next) {
+ if (!strcmp(wc->saddress, address)) {
+ slevel = wc->slevel + 1;
+ break;
+ }
+ }
+
+ if (!slevel) {
+ debug(Dbgsrvfn, "Could not find worker with address: %s\n",
+ address);
+ free(address);
+ return -1;
+ }
- wtime = strtoul(s, &p, 0);
- if (*s == '\0' || *p != '\0') {
- sp_werror("invalid work time given number", EINVAL);
+ free(address);
+ if (slevel >= maxlevels) {
+ debug(Dbgsrvfn, "Tree level is too large, not adding client: %s"
+ " as a worker\n", req->conn->address);
+ return count;
+ }
+ }
+
+ p = strchr(req->conn->address, '!');
+ if (!p) {
+ sp_werror("Port number not given by client", EINVAL);
return -1;
}
- if (f->numworkers >= maxworkers) {
- /* Not an error condition */
- debug(Dbgsrvfn, "Not accepting any more workers\n", f->numworkers);
- return count;
+ n = p - req->conn->address;
+ address = sp_malloc(n + 7);
+ if (!address)
+ return -1;
+
+ strncpy(address, req->conn->address, n);
+ snprintf(address+n, 7, "!%d", port);
+ for (wc = f->firstworker; wc != NULL; wc = wc->next) {
+ if (!strcmp(wc->saddress, address)) {
+ debug(Dbgsrvfn, "Found a duplicate worker, Address: %s."
+ " Not adding to worker list.\n", req->conn->address);
+ free(address);
+ return -1;
+ }
}
-
+
worker = sp_malloc(sizeof(*worker));
- if (!worker)
+ if (!worker) {
+ free(address);
return -1;
+ }
- /* get the ip address of the client from the connection */
- p = strchr(req->conn->address, '!');
- if (!p)
- p = req->conn->address + strlen(req->conn->address);
-
- n = p - req->conn->address;
- worker->ip = sp_malloc(n + 1);
- memmove(worker->ip, req->conn->address, n);
- worker->ip[n] = '\0';
- worker->port = port;
+ worker->saddress = address;
+ //Check to make sure this isn't a duplicate worker
+ worker->slevel = slevel;
+ worker->conns = 0;
worker->next = NULL;
worker->prev = f->lastworker;
- worker->until = future(wtime);
- debug(Dbgsrvfn, "new worker %p: address %s port %d for: %d seconds\n", worker, worker->ip,
- worker->port, wtime);
+ debug(Dbgsrvfn, "new worker with address %s and with level: %d for file: %s\n",
+ worker->saddress, worker->slevel, f->nname);
- //Check to make sure this isn't a duplicate worker
- for (dwc = f->firstworker; dwc != NULL; dwc = dwc->next) {
- if (!strcmp(dwc->ip, worker->ip) &&
- dwc->port == worker->port) {
- debug(Dbgsrvfn, "Found a duplicate worker, IP: %s, Port: %d."
- " Not adding to worker list.\n", worker->ip, worker->port);
- free(worker);
- return(count);
- }
- }
-
if (f->lastworker)
f->lastworker->next = worker;
@@ -1841,6 +2099,9 @@
fid->aux = worker;
f->numworkers++;
+ debug(Dbgsrvfn, "Total number of workers: %d for file: %s\n",
+ f->numworkers, f->nname);
+
return count;
}
@@ -1849,7 +2110,7 @@
{
Worker *worker;
File *f;
-
+
f = fid->file->aux;
worker = fid->aux;
if (!f || !worker)
Index: xget.h
===================================================================
--- xget.h (revision 695)
+++ xget.h (working copy)
@@ -20,6 +20,8 @@
typedef struct File File;
typedef struct Worker Worker;
+typedef struct Usedworker Usedworker;
+typedef struct Client Client;
typedef struct Req Req;
struct File {
@@ -35,6 +37,7 @@
Spcfsys*fs;
Spcfid* datafid; /* used while reading the file */
Spcfid* availfid;
+ Spcfid* redirfid;
Spcfd* datafd;
int numworkers;
@@ -51,14 +54,26 @@
};
struct Worker {
- char* ip;
- int port;
- /* the worker is alive until this time. Time is in seconds. */
- time_t until;
+ char* saddress;
Worker* prev;
Worker* next;
+ int slevel;
+ int conns;
};
+struct Usedworker {
+ Worker *worker;
+ Usedworker *next;
+};
+
+struct Client {
+ char* caddress;
+ int* clevels;
+ Client* prev;
+ Client* next;
+ Usedworker* workersused;
+};
+
struct Req {
u64 offset;
u32 count;
@@ -73,6 +88,7 @@
static void fsinit(void);
static int data_read(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
static int redir_read(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
+static void redir_closefid(Spfilefid *fid);
static int avail_write(Spfilefid *fid, u64 offset, u32 count, u8 *data, Spreq *req);
static int avail_wstat(Spfile* file, Spstat* stat);
static void avail_closefid(Spfilefid *fid);