coren has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/58449


Change subject: New implementation of log2udp
......................................................................

New implementation of log2udp

- optional line numbering
- optional line prefix
- coalesces lines into packets with configurable timeout
- fixed footprint

Change-Id: I4f39d696503fd9399c360902ebf21a7ad39fe649
---
A Makefile
A log2udp2.cc
2 files changed, 280 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/log2udp2 
refs/changes/49/58449/1

diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..9c2840a
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,5 @@
+
+
+log2udp2:      log2udp2.cc
+               g++ -o $@ $<
+
diff --git a/log2udp2.cc b/log2udp2.cc
new file mode 100644
index 0000000..fcd1b42
--- /dev/null
+++ b/log2udp2.cc
@@ -0,0 +1,275 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <stdio.h>
+#include <poll.h>
+#include <getopt.h>
+#include <netdb.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+
+static option options[] = {
+      { "number",      no_argument,            0, 'n' },
+      { "prefix",      required_argument,      0, 'p' },
+      { "timeout",     required_argument,      0, 't' },
+      { 0, 0, 0, 0 }
+};
+
+static char*   line_prefix = 0;
+static bool    numbering = false;
+static long    line_number = 0;
+static int     timeout = -1;
+
+static void usage(const char* exename)
+{
+    fprintf(stderr,
+           "Usage: %s [OPTION]... HOST PORT\n"
+           "Send standard input to UDP port PORT at HOST\n\n"
+           "Mandatory arguments to long options are mandatory for short 
options too.\n"
+           "  -n, --number           number outgoing lines\n"
+           "  -p, --prefix=PREFIX    prepent \"PREFIX: \" to outgoing lines.\n"
+           "  -t, --timeout=MSEC     wait MSEC miliseconds at most when 
merging\n"
+           "                         consecutive lines in a packet (-1 to 
wait\n"
+           "                         indefinitely, which is the default)\n",
+           exename);
+}
+
+// Our output buffer: we keep a quasi-circular data queue outbuf_size long
+// (quasi insofar as we roll over when we reach 64k from the end to keep packet
+// data contiguous), and a circular packet queue pktbuf_size long.  Both need
+// to be power-of-two in size.
+
+// the "currently built" packet lies beyond outbuf_head and is outbuf_len long.
+
+static const size_t    outbuf_size = 1024*1024;
+static char            outbuf[outbuf_size];
+static int             outbuf_tail = 0;
+static int             outbuf_head = 0;
+static size_t          outbuf_len = 0;
+
+static const size_t    pktbuf_size = 1024;
+static struct {
+    off_t                      data;
+    size_t                     len;
+}                      pktbuf[pktbuf_size];
+static int             pktbuf_tail = 0;
+static int             pktbuf_head = 0;
+
+
+static bool hasroom(void)
+{
+    if((pktbuf_head+1)&(pktbuf_size-1) == pktbuf_size)
+       return false;
+
+    if(outbuf_head < outbuf_tail)
+       return outbuf_tail-outbuf_head >= 65536;
+    if(outbuf_head+65536 < outbuf_size)
+       return true;
+    if(outbuf_tail >= 65536)
+       return true;
+    return false;
+}
+
+static void endpacket(void)
+{
+    if(outbuf_len > 0) {
+       pktbuf[pktbuf_head].data = outbuf_head;
+       pktbuf[pktbuf_head].len = outbuf_len;
+       pktbuf_head = (pktbuf_head+1) & (pktbuf_size-1);
+       outbuf_head += outbuf_len;
+       outbuf_len = 0;
+       if(outbuf_head+65536 >= outbuf_size)
+           outbuf_head = 0;
+    }
+}
+
+static void addline(const char* buf, size_t len)
+{
+    char hdr[64];
+
+    while(len && buf[len-1]=='\r')
+       --len;
+    if(!len)
+       return;
+
+    int hdr_len = 0;
+
+    if(numbering) {
+       sprintf(hdr, "%11ld ", line_number++);
+       hdr_len += 12;
+    }
+    if(line_prefix) {
+       hdr_len += snprintf(hdr+hdr_len, 64-hdr_len, "%.44s: ", line_prefix);
+    }
+
+    if(len+hdr_len > 65534)
+       len = 65534-hdr_len;
+
+    if(outbuf_len && (outbuf_len+hdr_len+len) > 1491)
+       endpacket();
+
+    char* cp_ptr = outbuf+outbuf_head;
+
+    if(hdr_len) {
+       memcpy(cp_ptr+outbuf_len, hdr, hdr_len);
+       outbuf_len += hdr_len;
+    }
+    memcpy(cp_ptr+outbuf_len, buf, len);
+    outbuf_len += len;
+    cp_ptr[outbuf_len++] = '\n';
+
+    if(outbuf_len >= 1491-hdr_len)
+       endpacket();
+}
+
+
+static const size_t    insize = 1024*1024;
+static char            inbuf[insize];
+static size_t          inlen = 0;
+
+int main(int argc, char** argv, char** envp)
+{
+    bool       flushing = false;
+    pollfd     fds[2];
+
+    for(;;) {
+       int     o = getopt_long(argc, argv, "np:t:", options, 0);
+
+       if(o<0)
+           break;
+
+       switch(o) {
+         case 'n':
+           numbering = true;
+           break;
+         case 'p':
+           line_prefix = new char[strlen(optarg)+1];
+           strcpy(line_prefix, optarg);
+           break;
+         case 't':
+           timeout = atoi(optarg);
+           break;
+         case ':':
+         case '?':
+           usage(argv[0]);
+           return 1;
+       }
+    }
+
+    if(optind != argc-2) {
+       usage(argv[0]);
+       return 1;
+    }
+
+    int sock = -1;
+
+    addrinfo   hints = { 0, 0, SOCK_DGRAM, 0, 0, 0, 0, 0 };
+    addrinfo   *addr;
+
+    if(int err = getaddrinfo(argv[optind], argv[optind+1], &hints, &addr)) {
+       fprintf(stderr, "%s: %s(%s): %s\n", argv[0], argv[optind], 
argv[optind+1], gai_strerror(err));
+       return 1;
+    }
+
+    for(addrinfo* a=addr; a; a=a->ai_next) {
+       if((sock = socket(a->ai_family, a->ai_socktype, a->ai_protocol)) >= 0) {
+           if(!connect(sock, a->ai_addr, a->ai_addrlen)) {
+               break;
+           }
+           close(sock);
+       }
+       if(!a->ai_next) {
+           perror("connect()");
+           return 1;
+       }
+    }
+
+    freeaddrinfo(addr);
+
+    fds[0].fd = 0;
+    fds[1].fd = sock;
+
+    for(;;) {
+       int pollrv = 0;
+
+       if(flushing) {
+           if(pktbuf_tail == pktbuf_head)
+               break;
+           fds[1].events = POLLOUT;
+           pollrv = poll(&fds[1], 1, -1);
+       } else {
+           fds[0].events = 0;
+           fds[1].events = 0;
+
+           if(inlen < insize)
+               fds[0].events = POLLIN;
+           if(pktbuf_tail != pktbuf_head)
+               fds[1].events = POLLOUT;
+           pollrv = poll(fds, 2, outbuf_len? timeout: -1);
+       }
+
+       if(pollrv < 0) {
+           perror("select()");
+           return 2;
+       }
+
+       if(pollrv == 0) {
+           endpacket();
+       }
+
+       if(fds[0].revents & (POLLERR|POLLHUP|POLLNVAL)) {
+           endpacket();
+           flushing = true;
+       } else if(fds[0].revents & POLLIN) {
+           int readlen = read(fds[0].fd, inbuf+inlen, insize-inlen);
+           if(readlen == 0) {
+               endpacket();
+               flushing = true;
+           }
+           if(readlen > 0) {
+               inlen += readlen;
+               int     inptr = 0;
+               while(inptr<inlen && hasroom()) {
+                   int lend = inptr;
+                   while(lend<inlen && inbuf[lend]!='\n')
+                       lend++;
+                   if(lend >= inlen)
+                       break;
+                   addline(inbuf+inptr, lend-inptr);
+                   inptr = lend+1;
+               }
+               if(inptr && inptr<inlen) {
+                   memmove(inbuf, inbuf+inptr, inlen-inptr);
+                   inlen -= inptr;
+               } else
+                   inlen = 0;
+           }
+       }
+
+       if(fds[1].revents & (POLLERR|POLLHUP|POLLNVAL)) {
+           fprintf(stderr, "%s: error condition on socket\n", argv[0]);
+           return 2;
+       } else if(fds[1].revents & POLLOUT) {
+           while(pktbuf_head != pktbuf_tail) {
+               int sendlen = send(fds[1].fd, outbuf+pktbuf[pktbuf_tail].data, 
pktbuf[pktbuf_tail].len, MSG_DONTWAIT|MSG_NOSIGNAL);
+
+               if(sendlen < 0) {
+                   if(errno==EAGAIN || errno==EWOULDBLOCK)
+                       break;
+                   perror("send()");
+                   return 2;
+               }
+               if(sendlen == 0)
+                   break;
+
+               pktbuf_tail = (pktbuf_tail+1) & (pktbuf_size-1);
+           }
+       }
+    }
+
+    return 0;
+}
+

-- 
To view, visit https://gerrit.wikimedia.org/r/58449
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I4f39d696503fd9399c360902ebf21a7ad39fe649
Gerrit-PatchSet: 1
Gerrit-Project: analytics/log2udp2
Gerrit-Branch: master
Gerrit-Owner: coren <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to