Update of /cvsroot/monetdb/sql/src/backends/monet5
In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv23243/src/backends/monet5
Modified Files:
merovingian.mx
Log Message:
propagated changes of Sunday Dec 28 2008 - Monday Jan 05 2009
from the Nov2008 branch to the development trunk
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.2
Attempt to fix a part of bug #2405952 by switching to select instead of
non-blocking IO, we still poll the hell out of the OS, though, but mserver5 is
above merovingian most of the time now.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.3
Bring down CPU load from 0.04% to 0.00% in idle state
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.4
cut down on time syscalls
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.5
Convert command listener loop into select block instead of non-block + sleep.
Now we are equal in CPU load to mserver5 in idle mode.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.6
started with debugging some weird effects, ended up rewriting large
parts of the logging facility :(
- properly handle SIGHUP so we don't silently quit
- can't really use select/poll when we use stdio streams, so refrain
from using the latter, in favour of the former, which eventually gives
us a better idle performance
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Index: merovingian.mx
===================================================================
RCS file: /cvsroot/monetdb/sql/src/backends/monet5/merovingian.mx,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -d -r1.70 -r1.71
--- merovingian.mx 17 Nov 2008 16:31:46 -0000 1.70
+++ merovingian.mx 5 Jan 2009 12:21:45 -0000 1.71
@@ -98,8 +98,8 @@
static str _merovingian_conffile = NULL;
typedef struct _dpair {
- FILE* fout; /* where to read stdout messages from */
- FILE* ferr; /* where to read stderr messages from */
+ int out; /* where to read stdout messages from */
+ int err; /* where to read stderr messages from */
pid_t pid; /* this process' id */
char* dbname; /* the database that this server serves */
pthread_t tid; /* thread id used when terminating this server */
@@ -119,30 +119,43 @@
static int _keepLogging = 1;
static int _timeInterval = 0;
static void
-logListener(void *p)
+logListener(void *x)
{
dpair d = topdp;
dpair w;
FILE *fout, *ferr;
char buf[8096];
- int hadData;
+ char *p, *q;
char mytime[20];
time_t now, lastout, lasterr;
struct tm *tmp;
- int equalouterr;
+ char equalouterr;
+ struct timeval tv;
+ fd_set readfds;
+ int nfds;
+ size_t len;
+ char writeident;
- (void)p;
+ (void)x;
/* the first entry in the list of d is where our output should go to */
- fout = d->fout;
- ferr = d->ferr;
+ fout = fdopen(d->out, "a");
+ if (d->out == d->err) {
+ ferr = fout;
+ equalouterr = 1;
+ } else {
+ ferr = fdopen(d->err, "a");
+ equalouterr = 0;
+ }
- if (d->dbname == NULL || fout == ferr) {
+ /* if we print to the console, avoid double TME messages */
+ if (d->dbname == NULL) {
equalouterr = 1;
} else {
equalouterr = 0;
}
+
/* skip the first entry, we don't care about it in the normal loop */
d = d->next;
@@ -158,47 +171,97 @@
lastout = 0;
lasterr = 0;
do {
+ /* wait max 1 second, tradeoff between performance and being
+ * able to catch up new logger streams */
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ FD_ZERO(&readfds);
+ nfds = 0;
w = d;
- hadData = 0;
while (w != NULL) {
- if (fgets(buf, 8096, w->fout) != NULL) {
+ FD_SET(w->out, &readfds);
+ if (nfds < w->out)
+ nfds = w->out;
+ FD_SET(w->err, &readfds);
+ if (nfds < w->err)
+ nfds = w->err;
+ w = w->next;
+ }
+ if (select(nfds + 1, &readfds, NULL, NULL, &tv) <= 0)
+ continue;
+ w = d;
+ while (w != NULL) {
+ if (FD_ISSET(w->out, &readfds) != 0) {
PRINT_TIME(fout, lastout);
- fprintf(fout, "MSG %s[" LLFMT "]: %s",
- w->dbname, (long long
int)w->pid, buf);
- hadData = 1;
+ writeident = 1;
+ do {
+ if ((len = read(w->out, buf, 8095)) <=
0)
+ break;
+ buf[len] = '\0';
+ q = buf;
+ while ((p = strchr(q, '\n')) != NULL) {
+ if (writeident == 1)
+ fprintf(fout, "MSG %s["
LLFMT "]: ",
+
w->dbname, (long long int)w->pid);
+ *p = '\0';
+ fprintf(fout, "%s\n", q);
+ q = p + 1;
+ writeident = 1;
+ }
+ if ((size_t)(q - buf) < len) {
+ if (writeident == 1)
+ fprintf(fout, "MSG %s["
LLFMT "]: ",
+
w->dbname, (long long int)w->pid);
+ writeident = 0;
+ fprintf(fout, "%s", q);
+ }
+ } while (len == 8095);
}
- if (fgets(buf, 8096, w->ferr) != NULL) {
+ if (w->err != w->out && FD_ISSET(w->err, &readfds) !=
0) {
if (equalouterr == 1) {
PRINT_TIME(ferr, lastout);
} else {
PRINT_TIME(ferr, lasterr);
}
- fprintf(ferr, "ERR %s[" LLFMT "]: %s",
- w->dbname, (long long
int)w->pid, buf);
- hadData = 1;
+ writeident = 1;
+ do {
+ if ((len = read(w->err, buf, 8095)) <=
0)
+ break;
+ buf[len] = '\0';
+ q = buf;
+ while ((p = strchr(q, '\n')) != NULL) {
+ if (writeident == 1)
+ fprintf(ferr, "ERR %s["
LLFMT "]: ",
+
w->dbname, (long long int)w->pid);
+ *p = '\0';
+ fprintf(ferr, "%s\n", q);
+ q = p + 1;
+ writeident = 1;
+ }
+ if ((size_t)(q - buf) < len) {
+ if (writeident == 1)
+ fprintf(ferr, "ERR %s["
LLFMT "]: ",
+
w->dbname, (long long int)w->pid);
+ writeident = 0;
+ fprintf(ferr, "%s", q);
+ }
+ } while (len == 8095);
}
w = w->next;
}
+
fflush(fout);
if (equalouterr == 0)
fflush(ferr);
- /* see if we need to break the loop */
- if (_keepLogging == 0) {
- if (hadData == 0) {
- break;
- } else {
- continue; /* don't wait if someone's
waiting for us */
- }
- }
- /* wait a tenth of a second */
- MT_sleep_ms(10);
- } while (1);
+ } while (_keepLogging != 0);
/* make sure we emit the current timestamp before we quit */
lastout = 0;
PRINT_TIME(fout, lastout);
+ fflush(fout);
if (equalouterr == 0) {
- lasterr = 0;
+ lasterr = 0;
PRINT_TIME(ferr, lasterr);
+ fflush(ferr);
}
}
@@ -537,11 +600,9 @@
dp = dp->next;
pdp = dp;
dp = dp->next = GDKmalloc(sizeof(struct _dpair));
- dp->fout = fdopen(pfdo[0], "r");
- fcntl(pfdo[0], F_SETFL, O_NONBLOCK);
+ dp->out = pfdo[0];
close(pfdo[1]);
- dp->ferr = fdopen(pfde[0], "r");
- fcntl(pfde[0], F_SETFL, O_NONBLOCK);
+ dp->err = pfde[0];
close(pfde[1]);
dp->next = NULL;
dp->pid = pid;
@@ -1155,7 +1216,6 @@
{
str msg;
int retval;
- struct timeval tv;
fd_set fds;
int msgsock;
err e;
@@ -1165,11 +1225,7 @@
FD_ZERO(&fds);
FD_SET(sock, &fds);
- /* Wait up to 0.5 seconds. */
- tv.tv_sec = 0;
- tv.tv_usec = 500;
-
- retval = select(sock + 1, &fds, NULL, NULL, &tv);
+ retval = select(sock + 1, &fds, NULL, NULL, NULL);
if (retval == 0) {
/* nothing interesting has happened */
continue;
@@ -1222,7 +1278,6 @@
sabdb *stats;
int pos = 0;
int retval;
- struct timeval tv;
fd_set fds;
int msgsock;
size_t len;
@@ -1236,11 +1291,7 @@
FD_ZERO(&fds);
FD_SET(sock, &fds);
- /* Wait up to 0.5 seconds. */
- tv.tv_sec = 0;
- tv.tv_usec = 500;
-
- retval = select(sock + 1, &fds, NULL, NULL, &tv);
+ retval = select(sock + 1, &fds, NULL, NULL, NULL);
if (retval == 0) {
/* nothing interesting has happened */
continue;
@@ -1403,10 +1454,13 @@
socklen_t addrlen;
struct sockaddr_storage peer_addr;
socklen_t peer_addr_len;
+ fd_set fds;
+ struct timeval tv;
int bcs;
struct sockaddr_in brdcst;
int c;
time_t deadline = 0;
+ time_t now = 0;
int forceannc = 0;
sabdb *orig;
sabdb *stats;
@@ -1458,11 +1512,12 @@
/* main loop */
while (_keepListening == 1) {
+ now = time(NULL);
/* do a round of announcements, we're ahead of the ttl because
* when we announce, we add 60 seconds to avoid a "gap" */
- if (forceannc == 1 || deadline <= time(NULL)) {
+ if (forceannc == 1 || deadline <= now) {
/* set new deadline */
- deadline = time(NULL) + discoveryttl;
+ deadline = now + discoveryttl;
forceannc = 0;
/* list all known databases */
@@ -1500,7 +1555,7 @@
prv = NULL;
rdb = _merovingian_remotedbs;
while (rdb != NULL) {
- if (rdb->ttl <= time(NULL)) {
+ if (rdb->ttl <= now) {
/* expired, let's remove */
if (prv == NULL) {
_merovingian_remotedbs = rdb->next;
@@ -1519,11 +1574,21 @@
}
peer_addr_len = sizeof(struct sockaddr_storage);
- nread = recvfrom(sock, buf, 512, MSG_DONTWAIT,
+ FD_ZERO(&fds);
+ FD_SET(sock, &fds);
+ /* Wait up to 5 seconds. */
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ nread = select(sock + 1, &fds, NULL, NULL, &tv);
+ if (nread == 0) {
+ /* nothing interesting has happened */
+ buf[0] = '\0';
+ continue;
+ }
+ nread = recvfrom(sock, buf, 512, 0,
(struct sockaddr *)&peer_addr, &peer_addr_len);
if (nread == -1) {
buf[0] = '\0';
- MT_sleep_ms(1000); /* don't spin */
continue; /* ignore failed request */
}
@@ -1742,8 +1807,8 @@
* logger might access it otherwise after the free) */
q->next = p->next;
/* close the descriptors */
- fclose(p->fout);
- fclose(p->ferr);
+ close(p->out);
+ close(p->err);
if (si->si_code == CLD_EXITED) {
merlog("database '%s' (%d) has exited with exit
status %d",
p->dbname, p->pid,
si->si_status);
@@ -2011,11 +2076,11 @@
if (msglog == NULL) {
/* stdout, save it */
argp = dup(1);
- topdp->fout = fdopen(argp, "w");
+ topdp->out = argp;
} else {
/* write to the given file */
- topdp->fout = fopen(msglog, "a");
- if (topdp->fout == NULL) {
+ topdp->out = open(msglog, O_WRONLY | O_APPEND);
+ if (topdp->out == -1) {
fprintf(stderr, "unable to open '%s': %s\n",
msglog, strerror(errno));
MERO_EXIT(1);
@@ -2028,14 +2093,14 @@
if (errlog == NULL) {
/* stderr, save it */
argp = dup(2);
- topdp->ferr = fdopen(argp, "w");
+ topdp->err = argp;
} else {
/* write to the given file */
if (strcmp(msglog, errlog) == 0) {
- topdp->ferr = topdp->fout;
+ topdp->err = topdp->out;
} else {
- topdp->ferr = fopen(errlog, "a");
- if (topdp->ferr == NULL) {
+ topdp->err = open(errlog, O_WRONLY | O_APPEND);
+ if (topdp->err == -1) {
fprintf(stderr, "unable to open '%s': %s\n",
errlog, strerror(errno));
MERO_EXIT(1);
@@ -2049,7 +2114,7 @@
d = topdp->next = alloca(sizeof(struct _dpair));
- /* make sure we will be able to write the our pid */
+ /* make sure we will be able to write our pid */
if ((pidfile = fopen(pidfilename, "w")) == NULL) {
fprintf(stderr, "unable to open '%s' for writing: %s\n",
pidfilename, strerror(errno));
@@ -2065,8 +2130,7 @@
MERO_EXIT(1);
return(1);
}
- d->fout = fdopen(pfd[0], "r");
- fcntl(pfd[0], F_SETFL, O_NONBLOCK);
+ d->out = pfd[0];
dup2(pfd[1], 1);
close(pfd[1]);
@@ -2079,8 +2143,7 @@
}
/* before it is too late, save original stderr */
oerr = fdopen(dup(2), "w");
- d->ferr = fdopen(pfd[0], "r");
- fcntl(pfd[0], F_SETFL, O_NONBLOCK);
+ d->err = pfd[0];
dup2(pfd[1], 2);
close(pfd[1]);
@@ -2122,6 +2185,11 @@
MERO_EXIT(1);
return(1);
}
+ if (sigaction(SIGHUP, &sa, NULL) == -1) {
+ fprintf(oerr, "%s: unable to create signal handlers\n",
argv[0]);
+ MERO_EXIT(1);
+ return(1);
+ }
sa.sa_flags = SA_SIGINFO;
sigemptyset(&sa.sa_mask);
@@ -2187,8 +2255,10 @@
/* wait for the control runner and discovery thread to have
* finished announcing it's going down */
+ close(unsock);
if (ctid != 0)
pthread_join(ctid, NULL);
+ close(usock);
if (dtid != 0)
pthread_join(dtid, NULL);
}
@@ -2204,8 +2274,6 @@
fprintf(stderr, "%s\n", e);
}
- merlog("Merovingian %s stopping ...", MEROV_VERSION);
-
/* we don't need merovingian itself */
d = d->next;
@@ -2242,20 +2310,22 @@
}
}
+ merlog("Merovingian %s stopped", MEROV_VERSION);
+
_keepLogging = 0;
if ((argp = pthread_join(tid, NULL)) != 0) {
- fprintf(stderr, "failed to wait for logging thread: %s\n",
strerror(argp));
+ fprintf(oerr, "failed to wait for logging thread: %s\n",
strerror(argp));
}
- fclose(topdp->fout);
- if (topdp->fout != topdp->ferr)
- fclose(topdp->ferr);
+ close(topdp->out);
+ if (topdp->out != topdp->err)
+ close(topdp->err);
/* clean up dbpair structs */
while (d != NULL) {
topdp = d->next;
- fclose(d->fout);
- fclose(d->ferr);
+ close(d->out);
+ close(d->err);
if (d->dbname != NULL)
GDKfree(d->dbname);
GDKfree(d);
------------------------------------------------------------------------------
_______________________________________________
Monetdb-sql-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-sql-checkins