Patch is attached and also at:

http://noc.wikimedia.org/~tstarling/patches/squid-2.6.5-udplog.patch

The pipe logging component is loosely based on the one by Andrey Chichak:

http://www.squid-cache.org/mail-archive/squid-dev/200509/0020.html

with some stylistic changes and an important functional change: EAGAIN
on write is ignored. We tried to put this pipe logging patch into
production on Wikimedia, piping into a fast UDP sender program. It
worked fine at low request rates, but once a certain request rate
threshold was reached (probably ~1.5k req/s), EAGAIN errors were
randomly encountered, probably due to the 4KB pipe buffer filling during
the course of a single time slice. This could be solved by buffering or
even by blocking, but I decided to output a debug message instead and
drop the log line. I thought the best solution for high request rates
was to move the UDP sender into squid itself. Other high request-rate
users should be advised to do the same.

The UDP component is my own work, although I did review the one by Mark
Dierolf:

http://www.squid-cache.org/mail-archive/squid-dev/200506/0019.html

One problem with that was that it used a blocking UDP socket -- UDP
sends can block if you send data fast enough. My code uses a
non-blocking socket generated by comm_open(). The socket send buffer is
generally much larger than the pipe buffer, so EAGAIN errors are much
less likely. I'm a fan of code reuse, so I refactored the address
parsing code from parse_sockaddr_in_list() in cache_cf.c into a new
parse_sockaddr() function in tools.c.

Notation for UDP logging in the configuration file may be either
udp:host:port or udp://host:port. The former would be favoured by my
reading of RFC 2718 section 2.1.2, the latter is traditional. For example:

access_log udp://log.example.com:5831

For pipe logging, it is:

access_log |some_program

as for Chichak's patch.

The buffering system is somewhat provisional and ad-hoc. The existing
log buffering feature was apparently rendered inoperable by the
logfileFlush() call in access_log.c, and I've commented it out pending
clarification. When using UDP logging, a buffer of 1450 bytes is used
unconditionally. Thus, multiple log lines are merged into each UDP
packet, where possible. This should ultimately be configurable.

I also added the log format code "%sn", which outputs a sequence number,
incremented with each log line. This is useful for tracking packet loss
when UDP is in use, and may even have some value as a security feature.

This isn't in production at Wikimedia yet, but it soon will be. I
thought I'd post this here first and see if anyone had any helpful comments.

-- Tim Starling
diff -ru squid-2.6.5/src/access_log.c squid-2.6.5-pipelog/src/access_log.c
--- squid-2.6.5/src/access_log.c        2006-09-28 21:55:37.000000000 +0000
+++ squid-2.6.5-pipelog/src/access_log.c        2007-01-20 03:15:21.000000000 
+0000
@@ -341,6 +341,8 @@
 
     LFT_EXT_LOG,
 
+    LFT_SEQUENCE_NUMBER,
+
     LFT_PERCENT                        /* special string cases for escaped 
chars */
 } logformat_bcode_t;
 
@@ -443,6 +445,8 @@
 /*{ "<sb", LFT_REPLY_SIZE_BODY }, */
 /*{ "<sB", LFT_REPLY_SIZE_BODY_NO_TE }, */
 
+    {"sn", LFT_SEQUENCE_NUMBER},
+
     {"%", LFT_PERCENT},
 
     {NULL, LFT_NONE}           /* this must be last */
@@ -457,7 +461,9 @@
     static MemBuf mb = MemBufNULL;
     char tmp[1024];
     String sb = StringNull;
+    static long int sequence_number = 0;
 
+    sequence_number++;
     memBufReset(&mb);
 
     lf = log->logFormat;
@@ -678,6 +684,10 @@
            quote = 1;
            break;
 
+       case LFT_SEQUENCE_NUMBER:
+           outint = sequence_number;
+           doint = 1;
+           break;
        case LFT_PERCENT:
            out = "%";
            break;
@@ -1151,7 +1161,7 @@
            fatalf("Unknown log format %d\n", log->type);
            break;
        }
-       logfileFlush(log->logfile);
+       /*logfileFlush(log->logfile);*/
        if (!checklist)
            break;
     }
diff -ru squid-2.6.5/src/cache_cf.c squid-2.6.5-pipelog/src/cache_cf.c
--- squid-2.6.5/src/cache_cf.c  2006-09-30 21:01:08.000000000 +0000
+++ squid-2.6.5-pipelog/src/cache_cf.c  2007-01-20 01:18:26.000000000 +0000
@@ -2602,42 +2602,16 @@
     return PEER_SIBLING;
 }
 
+
 #if USE_WCCPv2
 static void
 parse_sockaddr_in_list(sockaddr_in_list ** head)
 {
     char *token;
-    char *t;
-    char *host;
-    char *tmp;
-    const struct hostent *hp;
-    unsigned short port = 0;
     sockaddr_in_list *s;
     while ((token = strtok(NULL, w_space))) {
-       host = NULL;
-       port = 0;
-       if ((t = strchr(token, ':'))) {
-           /* host:port */
-           host = token;
-           *t = '\0';
-           port = xatos(t + 1);
-           if (0 == port)
-               self_destruct();
-       } else if ((port = strtol(token, &tmp, 10)), !*tmp) {
-           /* port */
-       } else {
-           host = token;
-           port = 0;
-       }
        s = xcalloc(1, sizeof(*s));
-       s->s.sin_port = htons(port);
-       if (NULL == host)
-           s->s.sin_addr = any_addr;
-       else if (1 == safe_inet_addr(host, &s->s.sin_addr))
-           (void) 0;
-       else if ((hp = gethostbyname(host)))    /* dont use ipcache */
-           s->s.sin_addr = inaddrFromHostent(hp);
-       else
+       if (!parse_sockaddr(token, &s->s))
            self_destruct();
        while (*head)
            head = &(*head)->next;
diff -ru squid-2.6.5/src/logfile.c squid-2.6.5-pipelog/src/logfile.c
--- squid-2.6.5/src/logfile.c   2007-01-20 00:07:19.000000000 +0000
+++ squid-2.6.5-pipelog/src/logfile.c   2007-01-20 02:58:19.000000000 +0000
@@ -35,6 +35,8 @@
 #include "squid.h"
 
 static void logfileWriteWrapper(Logfile * lf, const void *buf, size_t len);
+static int logfileOpenPipe(Logfile * lf);
+static int logfileOpenUDP(Logfile * lf);
 
 #if HAVE_SYSLOG
 
@@ -120,10 +122,16 @@
 {
     Logfile *lf = xcalloc(1, sizeof(*lf));
     xstrncpy(lf->path, path, MAXPATHLEN);
+    if (bufsz > 0) {
+       lf->bufsz = bufsz;
+    }
+    if (fatal_flag)
+       lf->flags.fatal = 1;
 #if HAVE_SYSLOG
     if (strcmp(path, "syslog") == 0 || strncmp(path, "syslog:", 7) == 0) {
        lf->flags.syslog = 1;
        lf->fd = -1;
+       lf->bufsz = 0;
        if (path[6] != '\0') {
            const char *priority = path + 7;
            char *facility = (char *) strchr(priority, '|');
@@ -137,7 +145,20 @@
            lf->syslog_priority |= LOG_INFO;
     } else
 #endif
-    {
+    if (strncmp(path, "udp:", 4) == 0) {
+       lf->flags.udp = 1;
+       /* Open a UDP socket, may change lf->bufsize */
+       if (!logfileOpenUDP(lf)) {
+           safe_free(lf);
+           return NULL;
+       }
+    } else if (path[0] == '|') {
+       lf->flags.pipe = 1;
+       if (!logfileOpenPipe(lf)) {
+           safe_free(lf);
+           return NULL;
+       }
+    } else {
        int fd = file_open(path, O_WRONLY | O_CREAT | O_TEXT);
        if (DISK_ERROR == fd) {
            if (ENOENT == errno && fatal_flag) {
@@ -156,13 +177,10 @@
            }
        }
        lf->fd = fd;
-       if (bufsz > 0) {
-           lf->buf = xmalloc(bufsz);
-           lf->bufsz = bufsz;
-       }
     }
-    if (fatal_flag)
-       lf->flags.fatal = 1;
+    if (lf->bufsz > 0) {
+       lf->buf = xmalloc(lf->bufsz);
+    }
     return lf;
 }
 
@@ -186,9 +204,17 @@
     int i;
     char from[MAXPATHLEN];
     char to[MAXPATHLEN];
-    assert(lf->path);
-    if (lf->flags.syslog)
+
+    if (lf->flags.syslog || lf->flags.udp)
        return;
+
+    if (lf->flags.pipe) {
+       file_close(lf->fd);
+       logfileOpenPipe(lf);
+       return;
+    }
+
+    assert(lf->path);
 #ifdef S_ISREG
     if (stat(lf->path, &sb) == 0)
        if (S_ISREG(sb.st_mode) == 0)
@@ -298,7 +324,100 @@
     fd_bytes(lf->fd, s, FD_WRITE);
     if (s == len)
        return;
+    if (errno == EAGAIN) {
+       /* This happens sometimes at high request rates when pipe logging 
+        * is enabled. The pipe buffer is very small.
+        */
+       debug(50, 1)("Log line lost due to full transmit buffer\n");
+       return;
+    }
+    if (lf->flags.udp) {
+       /*
+        * Ignore network errors
+        */
+       return;
+    }
+
     if (!lf->flags.fatal)
        return;
     fatalf("logfileWrite: %s: %s\n", lf->path, xstrerror());
 }
+
+/*
+ * Open a pipe for logging
+ */
+static int
+logfileOpenPipe(Logfile * lf)
+{
+    const char *args[2];
+    args[0] = "(logger)";
+    args[1] = NULL;
+    if ( ipcCreate(IPC_FIFO,&(lf->path[1]),args,"logger",NULL,&(lf->fd),NULL) 
<= 0 ) {
+       if (lf->flags.fatal) {
+           fatalf("Cannot start log listener %s: %s\n", lf->path, xstrerror());
+       } else {
+           debug(50,1)("Cannot start log listener %s: %s\n", lf->path, 
xstrerror());
+           return FALSE;
+       }
+    }
+    return TRUE;
+}
+
+/**
+ * Open a UDP socket for logging
+ * May change lf->bufsize
+ */
+static int
+logfileOpenUDP(Logfile * lf) 
+{
+    struct sockaddr_in addr;
+    char *strAddr;
+    if (strncmp(lf->path + 4, "//", 2) == 0) {
+       strAddr = xstrdup(lf->path + 6);
+    } else {
+       strAddr = xstrdup(lf->path + 4);
+    }
+    if (!parse_sockaddr(strAddr, &addr)) {
+       if (lf->flags.fatal) {
+           fatalf("Invalid UDP logging address '%s'\n", lf->path);
+       } else {
+           debug(50,1)("Invalid UDP logging address '%s'\n", lf->path);
+           safe_free(strAddr);
+           return FALSE;
+       }
+    }
+    safe_free(strAddr);
+    
+    lf->fd = comm_open(SOCK_DGRAM,
+       IPPROTO_UDP,
+       no_addr,
+       0,
+       COMM_NONBLOCKING,
+       "UDP log socket");
+    if (lf->fd < 0) {
+       if (lf->flags.fatal) {
+           fatalf("Unable to open UDP socket for logging\n");
+       } else {
+           debug(50,1)("Unable to open UDP socket for logging\n");
+           return FALSE;
+       }
+    }
+    if (comm_connect_addr(lf->fd, &addr)) {
+       if (lf->flags.fatal) {
+           fatalf("Unable to connect to %s for UDP log: %s\n", lf->path, 
xstrerror());
+       } else {
+           debug(50,1)("Unable to connect to %s for UDP log: %s\n", lf->path, 
xstrerror());
+           return FALSE;
+       }
+    }
+
+    /* Set the buffer size roughly equal to the MTU
+     * This will merge log lines into as few packets as possible
+     * 
+     * TODO: make this configurable, some users may want it to be zero
+     */
+    lf->bufsz = 1450;
+    return TRUE;
+}
+
+
diff -ru squid-2.6.5/src/protos.h squid-2.6.5-pipelog/src/protos.h
--- squid-2.6.5/src/protos.h    2006-10-23 11:22:21.000000000 +0000
+++ squid-2.6.5-pipelog/src/protos.h    2007-01-20 01:17:03.000000000 +0000
@@ -1198,6 +1198,7 @@
 void setUmask(mode_t mask);
 int xusleep(unsigned int usec);
 void keepCapabilities(void);
+extern int parse_sockaddr(char *s, struct sockaddr_in *addr);
 
 #if USE_HTCP
 extern void htcpInit(void);
diff -ru squid-2.6.5/src/structs.h squid-2.6.5-pipelog/src/structs.h
--- squid-2.6.5/src/structs.h   2007-01-20 00:07:19.000000000 +0000
+++ squid-2.6.5-pipelog/src/structs.h   2007-01-18 22:00:00.000000000 +0000
@@ -2439,6 +2439,8 @@
     struct {
        unsigned int fatal;
        unsigned int syslog;
+       unsigned int pipe;
+       unsigned int udp;
     } flags;
     int syslog_priority;
 };
diff -ru squid-2.6.5/src/tools.c squid-2.6.5-pipelog/src/tools.c
--- squid-2.6.5/src/tools.c     2006-11-01 20:58:52.000000000 +0000
+++ squid-2.6.5-pipelog/src/tools.c     2007-01-20 03:04:16.000000000 +0000
@@ -1346,3 +1346,43 @@
     }
 #endif
 }
+
+/**
+ * Parse a socket address (host:port), fill the given sockaddr_in structure
+ * Returns FALSE on failure, TRUE on success
+ * Destroys s
+ */
+int parse_sockaddr(char *s, struct sockaddr_in *addr)
+{
+    char *host, *tmp, *colon;
+    unsigned short port = 0;
+    const struct hostent *hp;
+
+    host = NULL;
+    port = 0;
+    if ((colon = strchr(s, ':'))) {
+       /* host:port */
+       host = s;
+       *colon = '\0';
+       port = xatos(colon + 1);
+       if (0 == port)
+           return FALSE;
+    } else if ((port = strtol(s, &tmp, 10)), !*tmp) {
+       /* port */
+    } else {
+       host = s;
+       port = 0;
+    }
+    addr->sin_port = htons(port);
+    if (NULL == host)
+       addr->sin_addr = any_addr;
+    else if (1 == safe_inet_addr(host, &addr->sin_addr))
+       (void) 0;
+    else if ((hp = gethostbyname(host)))       /* dont use ipcache */
+       addr->sin_addr = inaddrFromHostent(hp);
+    else
+       return FALSE;
+    addr->sin_family = AF_INET;
+    return TRUE;
+}
+

Reply via email to