coren has uploaded a new change for review.
https://gerrit.wikimedia.org/r/58449
Change subject: New implementation of log2udp
......................................................................
New implementation of log2udp
- optional line numbering
- optional line prefix
- coalesces lines into packets with configurable timeout
- fixed footprint
Change-Id: I4f39d696503fd9399c360902ebf21a7ad39fe649
---
A Makefile
A log2udp2.cc
2 files changed, 280 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/log2udp2
refs/changes/49/58449/1
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..9c2840a
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,5 @@
+
+
+log2udp2: log2udp2.cc
+ g++ -o $@ $<
+
diff --git a/log2udp2.cc b/log2udp2.cc
new file mode 100644
index 0000000..fcd1b42
--- /dev/null
+++ b/log2udp2.cc
@@ -0,0 +1,275 @@
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <stdio.h>
+#include <poll.h>
+#include <getopt.h>
+#include <netdb.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+
+static option options[] = {
+ { "number", no_argument, 0, 'n' },
+ { "prefix", required_argument, 0, 'p' },
+ { "timeout", required_argument, 0, 't' },
+ { 0, 0, 0, 0 }
+};
+
+static char* line_prefix = 0;
+static bool numbering = false;
+static long line_number = 0;
+static int timeout = -1;
+
+static void usage(const char* exename)
+{
+ fprintf(stderr,
+ "Usage: %s [OPTION]... HOST PORT\n"
+ "Send standard input to UDP port PORT at HOST\n\n"
+ "Mandatory arguments to long options are mandatory for short
options too.\n"
+ " -n, --number number outgoing lines\n"
+ " -p, --prefix=PREFIX prepent \"PREFIX: \" to outgoing lines.\n"
+ " -t, --timeout=MSEC wait MSEC miliseconds at most when
merging\n"
+ " consecutive lines in a packet (-1 to
wait\n"
+ " indefinitely, which is the default)\n",
+ exename);
+}
+
+// Our output buffer: we keep a quasi-circular data queue outbuf_size long
+// (quasi insofar as we roll over when we reach 64k from the end to keep packet
+// data contiguous), and a circular packet queue pktbuf_size long. Both need
+// to be power-of-two in size.
+
+// the "currently built" packet lies beyond outbuf_head and is outbuf_len long.
+
+static const size_t outbuf_size = 1024*1024;
+static char outbuf[outbuf_size];
+static int outbuf_tail = 0;
+static int outbuf_head = 0;
+static size_t outbuf_len = 0;
+
+static const size_t pktbuf_size = 1024;
+static struct {
+ off_t data;
+ size_t len;
+} pktbuf[pktbuf_size];
+static int pktbuf_tail = 0;
+static int pktbuf_head = 0;
+
+
+static bool hasroom(void)
+{
+ if((pktbuf_head+1)&(pktbuf_size-1) == pktbuf_size)
+ return false;
+
+ if(outbuf_head < outbuf_tail)
+ return outbuf_tail-outbuf_head >= 65536;
+ if(outbuf_head+65536 < outbuf_size)
+ return true;
+ if(outbuf_tail >= 65536)
+ return true;
+ return false;
+}
+
+static void endpacket(void)
+{
+ if(outbuf_len > 0) {
+ pktbuf[pktbuf_head].data = outbuf_head;
+ pktbuf[pktbuf_head].len = outbuf_len;
+ pktbuf_head = (pktbuf_head+1) & (pktbuf_size-1);
+ outbuf_head += outbuf_len;
+ outbuf_len = 0;
+ if(outbuf_head+65536 >= outbuf_size)
+ outbuf_head = 0;
+ }
+}
+
+static void addline(const char* buf, size_t len)
+{
+ char hdr[64];
+
+ while(len && buf[len-1]=='\r')
+ --len;
+ if(!len)
+ return;
+
+ int hdr_len = 0;
+
+ if(numbering) {
+ sprintf(hdr, "%11ld ", line_number++);
+ hdr_len += 12;
+ }
+ if(line_prefix) {
+ hdr_len += snprintf(hdr+hdr_len, 64-hdr_len, "%.44s: ", line_prefix);
+ }
+
+ if(len+hdr_len > 65534)
+ len = 65534-hdr_len;
+
+ if(outbuf_len && (outbuf_len+hdr_len+len) > 1491)
+ endpacket();
+
+ char* cp_ptr = outbuf+outbuf_head;
+
+ if(hdr_len) {
+ memcpy(cp_ptr+outbuf_len, hdr, hdr_len);
+ outbuf_len += hdr_len;
+ }
+ memcpy(cp_ptr+outbuf_len, buf, len);
+ outbuf_len += len;
+ cp_ptr[outbuf_len++] = '\n';
+
+ if(outbuf_len >= 1491-hdr_len)
+ endpacket();
+}
+
+
+static const size_t insize = 1024*1024;
+static char inbuf[insize];
+static size_t inlen = 0;
+
+int main(int argc, char** argv, char** envp)
+{
+ bool flushing = false;
+ pollfd fds[2];
+
+ for(;;) {
+ int o = getopt_long(argc, argv, "np:t:", options, 0);
+
+ if(o<0)
+ break;
+
+ switch(o) {
+ case 'n':
+ numbering = true;
+ break;
+ case 'p':
+ line_prefix = new char[strlen(optarg)+1];
+ strcpy(line_prefix, optarg);
+ break;
+ case 't':
+ timeout = atoi(optarg);
+ break;
+ case ':':
+ case '?':
+ usage(argv[0]);
+ return 1;
+ }
+ }
+
+ if(optind != argc-2) {
+ usage(argv[0]);
+ return 1;
+ }
+
+ int sock = -1;
+
+ addrinfo hints = { 0, 0, SOCK_DGRAM, 0, 0, 0, 0, 0 };
+ addrinfo *addr;
+
+ if(int err = getaddrinfo(argv[optind], argv[optind+1], &hints, &addr)) {
+ fprintf(stderr, "%s: %s(%s): %s\n", argv[0], argv[optind],
argv[optind+1], gai_strerror(err));
+ return 1;
+ }
+
+ for(addrinfo* a=addr; a; a=a->ai_next) {
+ if((sock = socket(a->ai_family, a->ai_socktype, a->ai_protocol)) >= 0) {
+ if(!connect(sock, a->ai_addr, a->ai_addrlen)) {
+ break;
+ }
+ close(sock);
+ }
+ if(!a->ai_next) {
+ perror("connect()");
+ return 1;
+ }
+ }
+
+ freeaddrinfo(addr);
+
+ fds[0].fd = 0;
+ fds[1].fd = sock;
+
+ for(;;) {
+ int pollrv = 0;
+
+ if(flushing) {
+ if(pktbuf_tail == pktbuf_head)
+ break;
+ fds[1].events = POLLOUT;
+ pollrv = poll(&fds[1], 1, -1);
+ } else {
+ fds[0].events = 0;
+ fds[1].events = 0;
+
+ if(inlen < insize)
+ fds[0].events = POLLIN;
+ if(pktbuf_tail != pktbuf_head)
+ fds[1].events = POLLOUT;
+ pollrv = poll(fds, 2, outbuf_len? timeout: -1);
+ }
+
+ if(pollrv < 0) {
+ perror("select()");
+ return 2;
+ }
+
+ if(pollrv == 0) {
+ endpacket();
+ }
+
+ if(fds[0].revents & (POLLERR|POLLHUP|POLLNVAL)) {
+ endpacket();
+ flushing = true;
+ } else if(fds[0].revents & POLLIN) {
+ int readlen = read(fds[0].fd, inbuf+inlen, insize-inlen);
+ if(readlen == 0) {
+ endpacket();
+ flushing = true;
+ }
+ if(readlen > 0) {
+ inlen += readlen;
+ int inptr = 0;
+ while(inptr<inlen && hasroom()) {
+ int lend = inptr;
+ while(lend<inlen && inbuf[lend]!='\n')
+ lend++;
+ if(lend >= inlen)
+ break;
+ addline(inbuf+inptr, lend-inptr);
+ inptr = lend+1;
+ }
+ if(inptr && inptr<inlen) {
+ memmove(inbuf, inbuf+inptr, inlen-inptr);
+ inlen -= inptr;
+ } else
+ inlen = 0;
+ }
+ }
+
+ if(fds[1].revents & (POLLERR|POLLHUP|POLLNVAL)) {
+ fprintf(stderr, "%s: error condition on socket\n", argv[0]);
+ return 2;
+ } else if(fds[1].revents & POLLOUT) {
+ while(pktbuf_head != pktbuf_tail) {
+ int sendlen = send(fds[1].fd, outbuf+pktbuf[pktbuf_tail].data,
pktbuf[pktbuf_tail].len, MSG_DONTWAIT|MSG_NOSIGNAL);
+
+ if(sendlen < 0) {
+ if(errno==EAGAIN || errno==EWOULDBLOCK)
+ break;
+ perror("send()");
+ return 2;
+ }
+ if(sendlen == 0)
+ break;
+
+ pktbuf_tail = (pktbuf_tail+1) & (pktbuf_size-1);
+ }
+ }
+ }
+
+ return 0;
+}
+
--
To view, visit https://gerrit.wikimedia.org/r/58449
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I4f39d696503fd9399c360902ebf21a7ad39fe649
Gerrit-PatchSet: 1
Gerrit-Project: analytics/log2udp2
Gerrit-Branch: master
Gerrit-Owner: coren <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits