Update of /cvsroot/monetdb/sql/src/backends/monet5
In directory 23jxhf1.ch3.sourceforge.com:/tmp/cvs-serv23243/src/backends/monet5

Modified Files:
        merovingian.mx 
Log Message:
propagated changes of Sunday Dec 28 2008 - Monday Jan 05 2009
from the Nov2008 branch to the development trunk

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.2
Attempt to fix a part of bug #2405952 by switching to select instead of 
non-blocking IO, we still poll the hell out of the OS, though, but mserver5 is 
above merovingian most of the time now.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.3
Bring down CPU load from 0.04% to 0.00% in idle state
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.4
cut down on time syscalls
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.5
Convert command listener loop into select block instead of non-block + sleep.  
Now we are equal in CPU load to mserver5 in idle mode.
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
2008/12/28 - mr-meltdown: src/backends/monet5/merovingian.mx,1.69.2.6
started with debugging some weird effects, ended up rewriting large
parts of the logging facility :(

- properly handle SIGHUP so we don't silently quit
- can't really use select/poll when we use stdio streams, so refrain
  from using the latter, in favour of the former, which eventually gives
  us a better idle performance
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~


Index: merovingian.mx
===================================================================
RCS file: /cvsroot/monetdb/sql/src/backends/monet5/merovingian.mx,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -d -r1.70 -r1.71
--- merovingian.mx      17 Nov 2008 16:31:46 -0000      1.70
+++ merovingian.mx      5 Jan 2009 12:21:45 -0000       1.71
@@ -98,8 +98,8 @@
 static str _merovingian_conffile = NULL;
 
 typedef struct _dpair {
-       FILE* fout;       /* where to read stdout messages from */
-       FILE* ferr;       /* where to read stderr messages from */
+       int out;          /* where to read stdout messages from */
+       int err;          /* where to read stderr messages from */
        pid_t pid;        /* this process' id */
        char* dbname;     /* the database that this server serves */
        pthread_t tid;    /* thread id used when terminating this server */
@@ -119,30 +119,43 @@
 static int _keepLogging = 1;
 static int _timeInterval = 0;
 static void
-logListener(void *p)
+logListener(void *x)
 {
        dpair d = topdp;
        dpair w;
        FILE *fout, *ferr;
        char buf[8096];
-       int hadData;
+       char *p, *q;
        char mytime[20];
        time_t now, lastout, lasterr;
        struct tm *tmp;
-       int equalouterr;
+       char equalouterr;
+       struct timeval tv;
+       fd_set readfds;
+       int nfds;
+       size_t len;
+       char writeident;
 
-       (void)p;
+       (void)x;
 
        /* the first entry in the list of d is where our output should go to */
-       fout = d->fout;
-       ferr = d->ferr;
+       fout = fdopen(d->out, "a");
+       if (d->out == d->err) {
+               ferr = fout;
+               equalouterr = 1;
+       } else {
+               ferr = fdopen(d->err, "a");
+               equalouterr = 0;
+       }
 
-       if (d->dbname == NULL || fout == ferr) {
+       /* if we print to the console, avoid double TME messages */
+       if (d->dbname == NULL) {
                equalouterr = 1;
        } else {
                equalouterr = 0;
        }
 
+
        /* skip the first entry, we don't care about it in the normal loop */
        d = d->next;
 
@@ -158,47 +171,97 @@
        lastout = 0;
        lasterr = 0;
        do {
+               /* wait max 1 second, tradeoff between performance and being
+                * able to catch up new logger streams */
+               tv.tv_sec = 1;
+               tv.tv_usec = 0;
+               FD_ZERO(&readfds);
+               nfds = 0;
                w = d;
-               hadData = 0;
                while (w != NULL) {
-                       if (fgets(buf, 8096, w->fout) != NULL) {
+                       FD_SET(w->out, &readfds);
+                       if (nfds < w->out)
+                               nfds = w->out;
+                       FD_SET(w->err, &readfds);
+                       if (nfds < w->err)
+                               nfds = w->err;
+                       w = w->next;
+               }
+               if (select(nfds + 1, &readfds, NULL, NULL, &tv) <= 0)
+                       continue;
+               w = d;
+               while (w != NULL) {
+                       if (FD_ISSET(w->out, &readfds) != 0) {
                                PRINT_TIME(fout, lastout);
-                               fprintf(fout, "MSG %s[" LLFMT "]: %s",
-                                               w->dbname, (long long 
int)w->pid, buf);
-                               hadData = 1;
+                               writeident = 1;
+                               do {
+                                       if ((len = read(w->out, buf, 8095)) <= 
0)
+                                               break;
+                                       buf[len] = '\0';
+                                       q = buf;
+                                       while ((p = strchr(q, '\n')) != NULL) {
+                                               if (writeident == 1)
+                                                       fprintf(fout, "MSG %s[" 
LLFMT "]: ",
+                                                                       
w->dbname, (long long int)w->pid);
+                                               *p = '\0';
+                                               fprintf(fout, "%s\n", q);
+                                               q = p + 1;
+                                               writeident = 1;
+                                       }
+                                       if ((size_t)(q - buf) < len) {
+                                               if (writeident == 1)
+                                                       fprintf(fout, "MSG %s[" 
LLFMT "]: ",
+                                                                       
w->dbname, (long long int)w->pid);
+                                               writeident = 0;
+                                               fprintf(fout, "%s", q);
+                                       }
+                               } while (len == 8095);
                        }
-                       if (fgets(buf, 8096, w->ferr) != NULL) {
+                       if (w->err != w->out && FD_ISSET(w->err, &readfds) != 
0) {
                                if (equalouterr == 1) {
                                        PRINT_TIME(ferr, lastout);
                                } else {
                                        PRINT_TIME(ferr, lasterr);
                                }
-                               fprintf(ferr, "ERR %s[" LLFMT "]: %s",
-                                               w->dbname, (long long 
int)w->pid, buf);
-                               hadData = 1;
+                               writeident = 1;
+                               do {
+                                       if ((len = read(w->err, buf, 8095)) <= 
0)
+                                               break;
+                                       buf[len] = '\0';
+                                       q = buf;
+                                       while ((p = strchr(q, '\n')) != NULL) {
+                                               if (writeident == 1)
+                                                       fprintf(ferr, "ERR %s[" 
LLFMT "]: ",
+                                                                       
w->dbname, (long long int)w->pid);
+                                               *p = '\0';
+                                               fprintf(ferr, "%s\n", q);
+                                               q = p + 1;
+                                               writeident = 1;
+                                       }
+                                       if ((size_t)(q - buf) < len) {
+                                               if (writeident == 1)
+                                                       fprintf(ferr, "ERR %s[" 
LLFMT "]: ",
+                                                                       
w->dbname, (long long int)w->pid);
+                                               writeident = 0;
+                                               fprintf(ferr, "%s", q);
+                                       }
+                               } while (len == 8095);
                        }
                        w = w->next;
                }
+
                fflush(fout);
                if (equalouterr == 0)
                        fflush(ferr);
-               /* see if we need to break the loop */
-               if (_keepLogging == 0) {
-                       if (hadData == 0) {
-                               break;
-                       } else {
-                               continue;       /* don't wait if someone's 
waiting for us */
-                       }
-               }
-               /* wait a tenth of a second */
-               MT_sleep_ms(10);
-       } while (1);
+       } while (_keepLogging != 0);
        /* make sure we emit the current timestamp before we quit */
        lastout = 0;
        PRINT_TIME(fout, lastout);
+       fflush(fout);
        if (equalouterr == 0) {
-               lasterr =  0;
+               lasterr = 0;
                PRINT_TIME(ferr, lasterr);
+               fflush(ferr);
        }
 }
 
@@ -537,11 +600,9 @@
                        dp = dp->next;
                pdp = dp;
                dp = dp->next = GDKmalloc(sizeof(struct _dpair));
-               dp->fout = fdopen(pfdo[0], "r");
-               fcntl(pfdo[0], F_SETFL, O_NONBLOCK);
+               dp->out = pfdo[0];
                close(pfdo[1]);
-               dp->ferr = fdopen(pfde[0], "r");
-               fcntl(pfde[0], F_SETFL, O_NONBLOCK);
+               dp->err = pfde[0];
                close(pfde[1]);
                dp->next = NULL;
                dp->pid = pid;
@@ -1155,7 +1216,6 @@
 {
        str msg;
        int retval;
-       struct timeval tv;
        fd_set fds;
        int msgsock;
        err e;
@@ -1165,11 +1225,7 @@
                FD_ZERO(&fds);
                FD_SET(sock, &fds);
 
-               /* Wait up to 0.5 seconds. */
-               tv.tv_sec = 0;
-               tv.tv_usec = 500;
-
-               retval = select(sock + 1, &fds, NULL, NULL, &tv);
+               retval = select(sock + 1, &fds, NULL, NULL, NULL);
                if (retval == 0) {
                        /* nothing interesting has happened */
                        continue;
@@ -1222,7 +1278,6 @@
        sabdb *stats;
        int pos = 0;
        int retval;
-       struct timeval tv;
        fd_set fds;
        int msgsock;
        size_t len;
@@ -1236,11 +1291,7 @@
                FD_ZERO(&fds);
                FD_SET(sock, &fds);
 
-               /* Wait up to 0.5 seconds. */
-               tv.tv_sec = 0;
-               tv.tv_usec = 500;
-
-               retval = select(sock + 1, &fds, NULL, NULL, &tv);
+               retval = select(sock + 1, &fds, NULL, NULL, NULL);
                if (retval == 0) {
                        /* nothing interesting has happened */
                        continue;
@@ -1403,10 +1454,13 @@
        socklen_t addrlen;
        struct sockaddr_storage peer_addr;
        socklen_t peer_addr_len;
+       fd_set fds;
+       struct timeval tv;
        int bcs;
        struct sockaddr_in brdcst;
        int c;
        time_t deadline = 0;
+       time_t now = 0;
        int forceannc = 0;
        sabdb *orig;
        sabdb *stats;
@@ -1458,11 +1512,12 @@
 
        /* main loop */
        while (_keepListening == 1) {
+               now = time(NULL);
                /* do a round of announcements, we're ahead of the ttl because
                 * when we announce, we add 60 seconds to avoid a "gap" */
-               if (forceannc == 1 || deadline <= time(NULL)) {
+               if (forceannc == 1 || deadline <= now) {
                        /* set new deadline */
-                       deadline = time(NULL) + discoveryttl;
+                       deadline = now + discoveryttl;
                        forceannc = 0;
 
                        /* list all known databases */
@@ -1500,7 +1555,7 @@
                prv = NULL;
                rdb = _merovingian_remotedbs;
                while (rdb != NULL) {
-                       if (rdb->ttl <= time(NULL)) {
+                       if (rdb->ttl <= now) {
                                /* expired, let's remove */
                                if (prv == NULL) {
                                        _merovingian_remotedbs = rdb->next;
@@ -1519,11 +1574,21 @@
                }
 
                peer_addr_len = sizeof(struct sockaddr_storage);
-               nread = recvfrom(sock, buf, 512, MSG_DONTWAIT,
+               FD_ZERO(&fds);
+               FD_SET(sock, &fds);
+               /* Wait up to 5 seconds. */
+               tv.tv_sec = 5;
+               tv.tv_usec = 0;
+               nread = select(sock + 1, &fds, NULL, NULL, &tv);
+               if (nread == 0) {
+                       /* nothing interesting has happened */
+                       buf[0] = '\0';
+                       continue;
+               }
+               nread = recvfrom(sock, buf, 512, 0,
                                (struct sockaddr *)&peer_addr, &peer_addr_len);
                if (nread == -1) {
                        buf[0] = '\0';
-                       MT_sleep_ms(1000); /* don't spin */
                        continue; /* ignore failed request */
                }
 
@@ -1742,8 +1807,8 @@
                         * logger might access it otherwise after the free) */
                        q->next = p->next;
                        /* close the descriptors */
-                       fclose(p->fout);
-                       fclose(p->ferr);
+                       close(p->out);
+                       close(p->err);
                        if (si->si_code == CLD_EXITED) {
                                merlog("database '%s' (%d) has exited with exit 
status %d",
                                                p->dbname, p->pid, 
si->si_status);
@@ -2011,11 +2076,11 @@
        if (msglog == NULL) {
                /* stdout, save it */
                argp = dup(1);
-               topdp->fout = fdopen(argp, "w");
+               topdp->out = argp;
        } else {
                /* write to the given file */
-               topdp->fout = fopen(msglog, "a");
-               if (topdp->fout == NULL) {
+               topdp->out = open(msglog, O_WRONLY | O_APPEND);
+               if (topdp->out == -1) {
                        fprintf(stderr, "unable to open '%s': %s\n",
                                        msglog, strerror(errno));
                        MERO_EXIT(1);
@@ -2028,14 +2093,14 @@
        if (errlog == NULL) {
                /* stderr, save it */
                argp = dup(2);
-               topdp->ferr = fdopen(argp, "w");
+               topdp->err = argp;
        } else {
                /* write to the given file */
                if (strcmp(msglog, errlog) == 0) {
-                       topdp->ferr = topdp->fout;
+                       topdp->err = topdp->out;
                } else {
-                       topdp->ferr = fopen(errlog, "a");
-                       if (topdp->ferr == NULL) {
+                       topdp->err = open(errlog, O_WRONLY | O_APPEND);
+                       if (topdp->err == -1) {
                                fprintf(stderr, "unable to open '%s': %s\n",
                                                errlog, strerror(errno));
                                MERO_EXIT(1);
@@ -2049,7 +2114,7 @@
 
        d = topdp->next = alloca(sizeof(struct _dpair));
 
-       /* make sure we will be able to write the our pid */
+       /* make sure we will be able to write our pid */
        if ((pidfile = fopen(pidfilename, "w")) == NULL) {
                fprintf(stderr, "unable to open '%s' for writing: %s\n",
                                pidfilename, strerror(errno));
@@ -2065,8 +2130,7 @@
                MERO_EXIT(1);
                return(1);
        }
-       d->fout = fdopen(pfd[0], "r");
-       fcntl(pfd[0], F_SETFL, O_NONBLOCK);
+       d->out = pfd[0];
        dup2(pfd[1], 1);
        close(pfd[1]);
 
@@ -2079,8 +2143,7 @@
        }
        /* before it is too late, save original stderr */
        oerr = fdopen(dup(2), "w");
-       d->ferr = fdopen(pfd[0], "r");
-       fcntl(pfd[0], F_SETFL, O_NONBLOCK);
+       d->err = pfd[0];
        dup2(pfd[1], 2);
        close(pfd[1]);
 
@@ -2122,6 +2185,11 @@
                MERO_EXIT(1);
                return(1);
        }
+       if (sigaction(SIGHUP, &sa, NULL) == -1) {
+               fprintf(oerr, "%s: unable to create signal handlers\n", 
argv[0]);
+               MERO_EXIT(1);
+               return(1);
+       }
 
        sa.sa_flags = SA_SIGINFO;
        sigemptyset(&sa.sa_mask);
@@ -2187,8 +2255,10 @@
 
                /* wait for the control runner and discovery thread to have
                 * finished announcing it's going down */
+               close(unsock);
                if (ctid != 0)
                        pthread_join(ctid, NULL);
+               close(usock);
                if (dtid != 0)
                        pthread_join(dtid, NULL);
        }
@@ -2204,8 +2274,6 @@
                fprintf(stderr, "%s\n", e);
        }
 
-       merlog("Merovingian %s stopping ...", MEROV_VERSION);
-
        /* we don't need merovingian itself */
        d = d->next;
 
@@ -2242,20 +2310,22 @@
                }
        }
 
+       merlog("Merovingian %s stopped", MEROV_VERSION);
+
        _keepLogging = 0;
        if ((argp = pthread_join(tid, NULL)) != 0) {
-               fprintf(stderr, "failed to wait for logging thread: %s\n", 
strerror(argp));
+               fprintf(oerr, "failed to wait for logging thread: %s\n", 
strerror(argp));
        }
 
-       fclose(topdp->fout);
-       if (topdp->fout != topdp->ferr)
-               fclose(topdp->ferr);
+       close(topdp->out);
+       if (topdp->out != topdp->err)
+               close(topdp->err);
 
        /* clean up dbpair structs */
        while (d != NULL) {
                topdp = d->next;
-               fclose(d->fout);
-               fclose(d->ferr);
+               close(d->out);
+               close(d->err);
                if (d->dbname != NULL)
                        GDKfree(d->dbname);
                GDKfree(d);


------------------------------------------------------------------------------
_______________________________________________
Monetdb-sql-checkins mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/monetdb-sql-checkins

Reply via email to