Ottomata has uploaded a new change for review. https://gerrit.wikimedia.org/r/230173
Change subject: [WIP] Add format.topic configuration ...................................................................... [WIP] Add format.topic configuration This allows for production to dynamic topics similar to the way that dynamic kafka keys are supported. Change-Id: I903494cd9e7dfd39fac622a05d6a07f62b365b8b --- M config.c M varnishkafka.c M varnishkafka.conf.example M varnishkafka.h 4 files changed, 141 insertions(+), 78 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/software/varnish/varnishkafka refs/changes/73/230173/1 diff --git a/config.c b/config.c index d06edd5..5c38787 100644 --- a/config.c +++ b/config.c @@ -5,24 +5,24 @@ * Copyright (c) 2013 Magnus Edenhill <[email protected]> * * All rights reserved. - * + * * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * + * modification, are permitted provided that the following conditions are met: + * * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. + * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * + * and/or other materials provided with the distribution. + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. @@ -58,7 +58,7 @@ static int conf_tof (const char *val) { char *end; int i; - + i = strtoul(val, &end, 0); if (end > val) /* treat as integer value */ return !!i; @@ -110,7 +110,7 @@ return 0; else if (res != RD_KAFKA_CONF_UNKNOWN) return -1; - + /* Unknown configs: fall thru */ name -= strlen("kafka."); } @@ -121,17 +121,21 @@ name); return -1; } - + /* varnishkafka configuration options */ if (!strcmp(name, "kafka.topic")) conf.topic = strdup(val); + else if (!strcmp(name, "format.topic")) { + conf.format[FMT_CONF_TOPIC] = strdup(val); + conf.fconf[FMT_CONF_TOPIC].encoding = VK_ENC_STRING; + } else if (!strcmp(name, "kafka.partition")) conf.partition = atoi(val); else if (!strcmp(name, "format")) conf.format[FMT_CONF_MAIN] = strdup(val); else if (!strcmp(name, "format.type")) { if ((conf.fconf[FMT_CONF_MAIN].encoding = - encoding_parse(val)) == -1) { + encoding_parse(val)) == -1) { snprintf(errstr, errstr_size, "Unknown format.type value \"%s\"", val); return -1; @@ -186,7 +190,7 @@ outfunc = out_null; else { snprintf(errstr, errstr_size, - "Unknown outputter \"%s\": " + "Unknown outputter \"%s\": " "try \"stdout\" or \"kafka\"", val); return -1; } @@ -231,6 +235,11 @@ return -1; } + if (conf.topic && conf.format[FMT_CONF_TOPIC]) { + snprintf(errstr, errstr_size, + "Cannot set both kafka.topic and format.topic\n"); + return -1; + } return 0; } @@ -297,7 +306,7 @@ /* trim "name"=.. */ if (!trim(&s, t)) { - fprintf(stderr, + fprintf(stderr, "%s:%i: warning: empty left-hand-side\n", path, line); continue; diff --git a/varnishkafka.c b/varnishkafka.c index 532bcb2..2b788e7 100644 --- a/varnishkafka.c +++ b/varnishkafka.c @@ -5,24 +5,24 @@ * Copyright (c) 2013 Magnus Edenhill <[email protected]> * * All rights reserved. - * + * * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * + * modification, are permitted provided that the following conditions are met: + * * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. + * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * + * and/or other materials provided with the distribution. + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. @@ -60,8 +60,8 @@ /* Kafka handle */ static rd_kafka_t *rk; -/* Kafka topic */ -static rd_kafka_topic_t *rkt; +static rd_kafka_topic_t *rkt_default; + /* Varnish shared memory handle*/ struct VSM_data *vd; @@ -70,8 +70,9 @@ static const char *fmt_conf_names[] = { - [FMT_CONF_MAIN] = "Main", - [FMT_CONF_KEY] = "Key" + [FMT_CONF_MAIN] = "Main", + [FMT_CONF_KEY] = "Key", + [FMT_CONF_TOPIC] = "Topic" }; /** @@ -300,7 +301,7 @@ return 0; } - + static inline void match_assign0 (const struct tag *tag, struct logline *lp, @@ -334,7 +335,7 @@ /** - * Allocate persistent memory space ('len' bytes) in + * Allocate persistent memory space ('len' bytes) in * logline 'lp's scratch buffer. */ static inline char *scratch_alloc (const struct tag *tag, struct logline *lp, @@ -576,7 +577,7 @@ char map[256] = {}; while (*match) map[(int)*(match++)] = 1; - + while (s < end) { if (map[(int)*s]) return (char *)s; @@ -585,7 +586,7 @@ return NULL; } - + /** * Splits 'ptr' (with length 'len') by delimiter 'delim' and assigns @@ -649,7 +650,7 @@ match_assign(tag, lp, s, slen); return 0; -} +} static int parse_U (const struct tag *tag, struct logline *lp, const char *ptr, int len) { @@ -842,7 +843,7 @@ out = realloc(out, outsize); assert(out); } - + memcpy(out+of, to, tolen); of += tolen; s--; @@ -888,7 +889,7 @@ const char *var; /* Special handling for non-name-value vars such as * %{Varnish:handling}x. fmtvar is "Varnish:handling" */ - const char *fmtvar; + const char *fmtvar; /* Column to extract: * 0 for entire string, else 1, 2, .. */ int col; @@ -900,7 +901,7 @@ /* Optional tag->flags */ int tag_flags; } f[4+1]; /* increase size when necessary (max used size + 1) */ - + /* Default string if no matching tag was found or all * parsers failed, defaults to "-". */ const char *def; @@ -930,7 +931,7 @@ { VSL_S_BACKEND, SLT_BackendOpen, parser: parse_BackendOpen } } }, - ['i'] = { { + ['i'] = { { { VSL_S_CLIENT, SLT_RxHeader }, } }, ['l'] = { { @@ -944,7 +945,7 @@ { VSL_S_CLIENT, SLT_RxURL, parser: parse_q }, { VSL_S_BACKEND, SLT_TxURL, parser: parse_q }, }, def: "" }, - ['o'] = { { + ['o'] = { { { VSL_S_CLIENT, SLT_TxHeader }, } }, ['s'] = { { @@ -971,7 +972,7 @@ var: "authorization", parser: parse_auth_user }, } }, - ['x'] = { { + ['x'] = { { { VSL_S_CLIENT, SLT_ReqEnd, fmtvar: "Varnish:time_firstbyte", col: 5 }, { VSL_S_CLIENT, SLT_ReqEnd, @@ -1143,7 +1144,7 @@ } while ((q = q2)); } else - varlen = (int)(b-a); + varlen = (int)(b-a); s = b+1; } @@ -1183,7 +1184,7 @@ if (map[(int)*s].f[i].tag == 0) continue; - /* mapping has fmtvar specified, make sure it + /* mapping has fmtvar specified, make sure it * matches the format's variable. */ if (map[(int)*s].f[i].fmtvar) { const char *iswc; @@ -1347,6 +1348,8 @@ void out_kafka (struct fmt_conf *fconf, struct logline *lp, const char *buf, size_t len) { + rd_kafka_topic_t *rkt; + /* If 'buf' is the key we simply store it for later use * when the message is produced. */ if (fconf->fid == FMT_CONF_KEY) { @@ -1357,15 +1360,45 @@ return; } + /* If 'buf' is the topic we simply store it for later use + * when the message is produced. */ + else if (fconf->fid == FMT_CONF_TOPIC) { + assert(!lp->topic); + lp->topic = malloc(len); + lp->topic_len = len; + memcpy(lp->topic, buf, len); + return; + } + + if (!rkt_default) { + // We need a topic if we get here. + assert(lp->topic); + _DBG("Producing to topic '%s", lp->topic); + + /* Create Kafka topic handle. + * rdkafka caches the topic here, so if this topic + * has already been seen, most likely it will not + * have to initializeĀ a new rd_kafka_topic_t. */ + if (!(rkt = rd_kafka_topic_new(rk, lp->topic, NULL))) { + vk_log("KAFKANEW", LOG_ERR, + "Invalid topic or configuration: %s: %s", + lp->topic, strerror(errno)); + exit(1); + } + } + // Else we'll just use the default topic set by kafka.topic. + else + rkt = rkt_default; + if (rd_kafka_produce(rkt, conf.partition, RD_KAFKA_MSG_F_COPY, (void *)buf, len, lp->key, lp->key_len, NULL) == -1) { cnt.txerr++; if (!rate_limit(RL_KAFKA_PRODUCE_ERR)) vk_log("PRODUCE", LOG_WARNING, - "Failed to produce Kafka message " - "(seq %"PRIu64"): %s (%i messages in outq)", - lp->seq, strerror(errno), rd_kafka_outq_len(rk)); + "Failed to produce Kafka message " + "(seq %"PRIu64"): %s (%i messages in outq)", + lp->seq, strerror(errno), rd_kafka_outq_len(rk)); } rd_kafka_poll(rk, 0); @@ -1594,11 +1627,19 @@ lp->tmpbuf = tmpbuf->next; free(tmpbuf); } - + if (lp->key) { free(lp->key); lp->key = NULL; lp->key_len = 0; + } + + if (lp->topic) { + // TODO: THIS SHOULD NOT BE NEEDED??? + memset(lp->topic, 0x00, lp->topic_len); + free(lp->topic); + lp->topic = NULL; + lp->topic_len = 0; } lp->seq = 0; @@ -1709,11 +1750,11 @@ if (tag->var && !(tag->flags & TAG_F_NOVARMATCH)) { const char *t; - + /* Variable match ("Varname: value") */ if (!(t = strnchr(ptr, len, ':'))) continue; - + if (tag->varlen != (int)(t-ptr) || strncasecmp(ptr, tag->var, tag->varlen)) continue; @@ -1744,7 +1785,7 @@ if (tag->parser) { /* Pass value to parser which will assign it. */ tag->parser(tag, lp, ptr2, len2); - + } else { /* Fallback to verbatim field. */ match_assign(tag, lp, ptr2, len2); @@ -1799,7 +1840,7 @@ logline_reset(lp); return conf.pret; } - + /* Log line is complete: render & output */ render_match(lp, ++conf.sequence_number); @@ -1991,7 +2032,7 @@ for (i = 0 ; i < FMT_CONF_NUM ; i++) conf.fconf[i].fid = i; - + conf.format[FMT_CONF_MAIN] = "%l %n %t %{Varnish:time_firstbyte}x %h " "%{Varnish:handling}x/%s %b %m http://%{Host}i%U%q - - " "%{Referer}i %{X-Forwarded-For}i %{User-agent}i"; @@ -2031,9 +2072,6 @@ /* Read config file */ if (conf_file_read(conf_file_path) == -1) exit(1); - - if (!conf.topic) - usage(argv[0]); /* Always include client communication (-c) */ VSL_Arg(vd, 'c', NULL); @@ -2136,14 +2174,18 @@ rd_kafka_set_log_level(rk, conf.log_level); - /* Create Kafka topic handle */ - if (!(rkt = rd_kafka_topic_new(rk, conf.topic, + + /* If kafka.topic is given, then it will be used as the + * topic for all messages. */ + + if (conf.topic && !(rkt_default = rd_kafka_topic_new(rk, conf.topic, conf.topic_conf))) { vk_log("KAFKANEW", LOG_ERR, "Invalid topic or configuration: %s: %s", conf.topic, strerror(errno)); exit(1); } + } /* Main dispatcher loop depending on outputter */ diff --git a/varnishkafka.conf.example b/varnishkafka.conf.example index 1fd9fab..e880bb0 100644 --- a/varnishkafka.conf.example +++ b/varnishkafka.conf.example @@ -99,6 +99,12 @@ format.key.type = string format.key = %{%s}t +# Optional secondary formatting. +# 'output = kafka': The rendered 'format.key' will be provided as the +# Kafka message Key +# 'output = string': Print string to stdout. +# This only supports string format type. No json. +# format.topic = %{Host?default-topic}i # Where to output varnish log lines: @@ -256,7 +262,8 @@ # Topic configuration # -# Topic to produce messages to +# Topic to produce messages to. +# If format.topic is set, this will be ignored. kafka.topic = varnish # Partition (-1: random, else one of the available partitions) diff --git a/varnishkafka.h b/varnishkafka.h index da46a4e..411576c 100644 --- a/varnishkafka.h +++ b/varnishkafka.h @@ -5,24 +5,24 @@ * Copyright (c) 2013 Magnus Edenhill <[email protected]> * * All rights reserved. - * + * * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * + * modification, are permitted provided that the following conditions are met: + * * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. + * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation - * and/or other materials provided with the distribution. - * + * and/or other materials provided with the distribution. + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. @@ -59,7 +59,8 @@ /* Format configurations */ #define FMT_CONF_MAIN 0 /* Main format */ #define FMT_CONF_KEY 1 /* Kafka key format */ -#define FMT_CONF_NUM 2 +#define FMT_CONF_TOPIC 2 /* Kafka topic format */ +#define FMT_CONF_NUM 3 /** @@ -98,6 +99,10 @@ char *key; size_t key_len; + /* Rendered FMT_CONF_TOPIC for use in _MAIN output func */ + char *topic; + size_t topic_len; + /* Auxillery buffers (if scratch pad is not sufficient) */ struct tmpbuf *tmpbuf; -- To view, visit https://gerrit.wikimedia.org/r/230173 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I903494cd9e7dfd39fac622a05d6a07f62b365b8b Gerrit-PatchSet: 1 Gerrit-Project: operations/software/varnish/varnishkafka Gerrit-Branch: master Gerrit-Owner: Ottomata <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
