Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/230173

Change subject: [WIP] Add format.topic configuration
......................................................................

[WIP] Add format.topic configuration

This allows for production to dynamic topics similar to the way that
dynamic kafka keys are supported.

Change-Id: I903494cd9e7dfd39fac622a05d6a07f62b365b8b
---
M config.c
M varnishkafka.c
M varnishkafka.conf.example
M varnishkafka.h
4 files changed, 141 insertions(+), 78 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/operations/software/varnish/varnishkafka 
refs/changes/73/230173/1

diff --git a/config.c b/config.c
index d06edd5..5c38787 100644
--- a/config.c
+++ b/config.c
@@ -5,24 +5,24 @@
  * Copyright (c) 2013 Magnus Edenhill <[email protected]>
  *
  * All rights reserved.
- * 
+ *
  * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
+ * modification, are permitted provided that the following conditions are met:
+ *
  * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
+ *    this list of conditions and the following disclaimer.
  * 2. Redistributions in binary form must reproduce the above copyright notice,
  *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
+ *    and/or other materials provided with the distribution.
+ *
  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  * POSSIBILITY OF SUCH DAMAGE.
@@ -58,7 +58,7 @@
 static int conf_tof (const char *val) {
        char *end;
        int i;
-       
+
        i = strtoul(val, &end, 0);
        if (end > val) /* treat as integer value */
                return !!i;
@@ -110,7 +110,7 @@
                        return 0;
                else if (res != RD_KAFKA_CONF_UNKNOWN)
                        return -1;
-               
+
                /* Unknown configs: fall thru */
                name -= strlen("kafka.");
        }
@@ -121,17 +121,21 @@
                         name);
                return -1;
        }
-                        
+
        /* varnishkafka configuration options */
        if (!strcmp(name, "kafka.topic"))
                conf.topic = strdup(val);
+       else if (!strcmp(name, "format.topic")) {
+               conf.format[FMT_CONF_TOPIC] = strdup(val);
+               conf.fconf[FMT_CONF_TOPIC].encoding = VK_ENC_STRING;
+       }
        else if (!strcmp(name, "kafka.partition"))
                conf.partition = atoi(val);
        else if (!strcmp(name, "format"))
                conf.format[FMT_CONF_MAIN] = strdup(val);
        else if (!strcmp(name, "format.type")) {
                if ((conf.fconf[FMT_CONF_MAIN].encoding =
-                    encoding_parse(val)) == -1) {
+                       encoding_parse(val)) == -1) {
                        snprintf(errstr, errstr_size,
                                 "Unknown format.type value \"%s\"", val);
                        return -1;
@@ -186,7 +190,7 @@
                        outfunc = out_null;
                else {
                        snprintf(errstr, errstr_size,
-                                "Unknown outputter \"%s\": " 
+                                "Unknown outputter \"%s\": "
                                 "try \"stdout\" or \"kafka\"", val);
                        return -1;
                }
@@ -231,6 +235,11 @@
                return -1;
        }
 
+       if (conf.topic && conf.format[FMT_CONF_TOPIC]) {
+               snprintf(errstr, errstr_size,
+                        "Cannot set both kafka.topic and format.topic\n");
+               return -1;
+       }
 
        return 0;
 }
@@ -297,7 +306,7 @@
 
                /* trim "name"=.. */
                if (!trim(&s, t)) {
-                       fprintf(stderr, 
+                       fprintf(stderr,
                                "%s:%i: warning: empty left-hand-side\n",
                                path, line);
                        continue;
diff --git a/varnishkafka.c b/varnishkafka.c
index 532bcb2..2b788e7 100644
--- a/varnishkafka.c
+++ b/varnishkafka.c
@@ -5,24 +5,24 @@
  * Copyright (c) 2013 Magnus Edenhill <[email protected]>
  *
  * All rights reserved.
- * 
+ *
  * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
+ * modification, are permitted provided that the following conditions are met:
+ *
  * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
+ *    this list of conditions and the following disclaimer.
  * 2. Redistributions in binary form must reproduce the above copyright notice,
  *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
+ *    and/or other materials provided with the distribution.
+ *
  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  * POSSIBILITY OF SUCH DAMAGE.
@@ -60,8 +60,8 @@
 
 /* Kafka handle */
 static rd_kafka_t *rk;
-/* Kafka topic */
-static rd_kafka_topic_t *rkt;
+static rd_kafka_topic_t *rkt_default;
+
 
 /* Varnish shared memory handle*/
 struct VSM_data *vd;
@@ -70,8 +70,9 @@
 
 
 static const char *fmt_conf_names[] = {
-       [FMT_CONF_MAIN] = "Main",
-       [FMT_CONF_KEY]  = "Key"
+       [FMT_CONF_MAIN]   = "Main",
+       [FMT_CONF_KEY]    = "Key",
+       [FMT_CONF_TOPIC]  = "Topic"
 };
 
 /**
@@ -300,7 +301,7 @@
 
        return 0;
 }
-                    
+
 
 
 static inline void match_assign0 (const struct tag *tag, struct logline *lp,
@@ -334,7 +335,7 @@
 
 
 /**
- * Allocate persistent memory space ('len' bytes) in 
+ * Allocate persistent memory space ('len' bytes) in
  * logline 'lp's scratch buffer.
  */
 static inline char *scratch_alloc (const struct tag *tag, struct logline *lp,
@@ -576,7 +577,7 @@
        char map[256] = {};
        while (*match)
                map[(int)*(match++)] = 1;
-       
+
        while (s < end) {
                if (map[(int)*s])
                        return (char *)s;
@@ -585,7 +586,7 @@
 
        return NULL;
 }
-       
+
 
 /**
  * Splits 'ptr' (with length 'len') by delimiter 'delim' and assigns
@@ -649,7 +650,7 @@
        match_assign(tag, lp, s, slen);
 
        return 0;
-}      
+}
 
 static int parse_U (const struct tag *tag, struct logline *lp,
                    const char *ptr, int len) {
@@ -842,7 +843,7 @@
                                out = realloc(out, outsize);
                                assert(out);
                        }
-                       
+
                        memcpy(out+of, to, tolen);
                        of += tolen;
                        s--;
@@ -888,7 +889,7 @@
                        const char *var;
                        /* Special handling for non-name-value vars such as
                         * %{Varnish:handling}x. fmtvar is "Varnish:handling" */
-                       const char *fmtvar; 
+                       const char *fmtvar;
                        /* Column to extract:
                         * 0 for entire string, else 1, 2, .. */
                        int col;
@@ -900,7 +901,7 @@
                        /* Optional tag->flags */
                        int tag_flags;
                } f[4+1]; /* increase size when necessary (max used size + 1) */
-               
+
                /* Default string if no matching tag was found or all
                 * parsers failed, defaults to "-". */
                const char *def;
@@ -930,7 +931,7 @@
                                { VSL_S_BACKEND, SLT_BackendOpen,
                                  parser: parse_BackendOpen }
                        } },
-               ['i'] = { { 
+               ['i'] = { {
                                { VSL_S_CLIENT, SLT_RxHeader },
                        } },
                ['l'] = { {
@@ -944,7 +945,7 @@
                                { VSL_S_CLIENT, SLT_RxURL, parser: parse_q },
                                { VSL_S_BACKEND, SLT_TxURL, parser: parse_q },
                        },  def: "" },
-               ['o'] = { { 
+               ['o'] = { {
                                { VSL_S_CLIENT, SLT_TxHeader },
                        } },
                ['s'] = { {
@@ -971,7 +972,7 @@
                                  var: "authorization",
                                  parser: parse_auth_user },
                        } },
-               ['x'] = { { 
+               ['x'] = { {
                                { VSL_S_CLIENT, SLT_ReqEnd,
                                  fmtvar: "Varnish:time_firstbyte", col: 5 },
                                { VSL_S_CLIENT, SLT_ReqEnd,
@@ -1143,7 +1144,7 @@
                                } while ((q = q2));
 
                        } else
-                               varlen = (int)(b-a);                    
+                               varlen = (int)(b-a);
 
                        s = b+1;
                }
@@ -1183,7 +1184,7 @@
                        if (map[(int)*s].f[i].tag == 0)
                                continue;
 
-                       /* mapping has fmtvar specified, make sure it 
+                       /* mapping has fmtvar specified, make sure it
                         * matches the format's variable. */
                        if (map[(int)*s].f[i].fmtvar) {
                                const char *iswc;
@@ -1347,6 +1348,8 @@
 void out_kafka (struct fmt_conf *fconf, struct logline *lp,
                const char *buf, size_t len) {
 
+       rd_kafka_topic_t *rkt;
+
        /* If 'buf' is the key we simply store it for later use
         * when the message is produced. */
        if (fconf->fid == FMT_CONF_KEY) {
@@ -1357,15 +1360,45 @@
                return;
        }
 
+       /* If 'buf' is the topic we simply store it for later use
+        * when the message is produced. */
+       else if (fconf->fid == FMT_CONF_TOPIC) {
+               assert(!lp->topic);
+               lp->topic = malloc(len);
+               lp->topic_len = len;
+               memcpy(lp->topic, buf, len);
+               return;
+       }
+
+       if (!rkt_default) {
+               // We need a topic if we get here.
+               assert(lp->topic);
+               _DBG("Producing to topic '%s", lp->topic);
+
+               /* Create Kafka topic handle.
+                * rdkafka caches the topic here, so if this topic
+                * has already been seen, most likely it will not
+                * have to initializeĀ a new rd_kafka_topic_t. */
+               if (!(rkt = rd_kafka_topic_new(rk, lp->topic, NULL))) {
+                       vk_log("KAFKANEW", LOG_ERR,
+                               "Invalid topic or configuration: %s: %s",
+                               lp->topic, strerror(errno));
+                       exit(1);
+               }
+       }
+       // Else we'll just use the default topic set by kafka.topic.
+       else
+               rkt = rkt_default;
+
        if (rd_kafka_produce(rkt, conf.partition, RD_KAFKA_MSG_F_COPY,
                             (void *)buf, len,
                             lp->key, lp->key_len, NULL) == -1) {
                cnt.txerr++;
                if (!rate_limit(RL_KAFKA_PRODUCE_ERR))
                        vk_log("PRODUCE", LOG_WARNING,
-                              "Failed to produce Kafka message "
-                              "(seq %"PRIu64"): %s (%i messages in outq)",
-                              lp->seq, strerror(errno), rd_kafka_outq_len(rk));
+                               "Failed to produce Kafka message "
+                               "(seq %"PRIu64"): %s (%i messages in outq)",
+                               lp->seq, strerror(errno), 
rd_kafka_outq_len(rk));
        }
 
        rd_kafka_poll(rk, 0);
@@ -1594,11 +1627,19 @@
                lp->tmpbuf = tmpbuf->next;
                free(tmpbuf);
        }
-       
+
        if (lp->key) {
                free(lp->key);
                lp->key = NULL;
                lp->key_len = 0;
+       }
+
+       if (lp->topic) {
+               // TODO: THIS SHOULD NOT BE NEEDED???
+               memset(lp->topic, 0x00, lp->topic_len);
+               free(lp->topic);
+               lp->topic = NULL;
+               lp->topic_len = 0;
        }
 
        lp->seq       = 0;
@@ -1709,11 +1750,11 @@
 
                if (tag->var && !(tag->flags & TAG_F_NOVARMATCH)) {
                        const char *t;
-                       
+
                        /* Variable match ("Varname: value") */
                        if (!(t = strnchr(ptr, len, ':')))
                                continue;
-                       
+
                        if (tag->varlen != (int)(t-ptr) ||
                            strncasecmp(ptr, tag->var, tag->varlen))
                                continue;
@@ -1744,7 +1785,7 @@
                if (tag->parser) {
                        /* Pass value to parser which will assign it. */
                        tag->parser(tag, lp, ptr2, len2);
-                       
+
                } else {
                        /* Fallback to verbatim field. */
                        match_assign(tag, lp, ptr2, len2);
@@ -1799,7 +1840,7 @@
                logline_reset(lp);
                return conf.pret;
        }
-       
+
        /* Log line is complete: render & output */
        render_match(lp, ++conf.sequence_number);
 
@@ -1991,7 +2032,7 @@
 
        for (i = 0 ; i < FMT_CONF_NUM ; i++)
                conf.fconf[i].fid = i;
-               
+
        conf.format[FMT_CONF_MAIN] = "%l %n %t %{Varnish:time_firstbyte}x %h "
                "%{Varnish:handling}x/%s %b %m http://%{Host}i%U%q - - "
                "%{Referer}i %{X-Forwarded-For}i %{User-agent}i";
@@ -2031,9 +2072,6 @@
        /* Read config file */
        if (conf_file_read(conf_file_path) == -1)
                exit(1);
-
-       if (!conf.topic)
-               usage(argv[0]);
 
        /* Always include client communication (-c) */
        VSL_Arg(vd, 'c', NULL);
@@ -2136,14 +2174,18 @@
 
                rd_kafka_set_log_level(rk, conf.log_level);
 
-               /* Create Kafka topic handle */
-               if (!(rkt = rd_kafka_topic_new(rk, conf.topic,
+
+               /* If kafka.topic is given, then it will be used as the
+                * topic for all messages. */
+
+               if (conf.topic && !(rkt_default = rd_kafka_topic_new(rk, 
conf.topic,
                                               conf.topic_conf))) {
                        vk_log("KAFKANEW", LOG_ERR,
                               "Invalid topic or configuration: %s: %s",
                               conf.topic, strerror(errno));
                        exit(1);
                }
+
        }
 
        /* Main dispatcher loop depending on outputter */
diff --git a/varnishkafka.conf.example b/varnishkafka.conf.example
index 1fd9fab..e880bb0 100644
--- a/varnishkafka.conf.example
+++ b/varnishkafka.conf.example
@@ -99,6 +99,12 @@
 format.key.type = string
 format.key = %{%s}t
 
+# Optional secondary formatting.
+#   'output = kafka':  The rendered 'format.key' will be provided as the
+#                      Kafka message Key
+#   'output = string': Print string to stdout.
+# This only supports string format type.  No json.
+# format.topic = %{Host?default-topic}i
 
 
 # Where to output varnish log lines:
@@ -256,7 +262,8 @@
 # Topic configuration
 #
 
-# Topic to produce messages to
+# Topic to produce messages to.
+# If format.topic is set, this will be ignored.
 kafka.topic = varnish
 
 # Partition (-1: random, else one of the available partitions)
diff --git a/varnishkafka.h b/varnishkafka.h
index da46a4e..411576c 100644
--- a/varnishkafka.h
+++ b/varnishkafka.h
@@ -5,24 +5,24 @@
  * Copyright (c) 2013 Magnus Edenhill <[email protected]>
  *
  * All rights reserved.
- * 
+ *
  * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are met: 
- * 
+ * modification, are permitted provided that the following conditions are met:
+ *
  * 1. Redistributions of source code must retain the above copyright notice,
- *    this list of conditions and the following disclaimer. 
+ *    this list of conditions and the following disclaimer.
  * 2. Redistributions in binary form must reproduce the above copyright notice,
  *    this list of conditions and the following disclaimer in the documentation
- *    and/or other materials provided with the distribution. 
- * 
+ *    and/or other materials provided with the distribution.
+ *
  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 
- * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 
- * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 
- * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 
- * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  * POSSIBILITY OF SUCH DAMAGE.
@@ -59,7 +59,8 @@
 /* Format configurations */
 #define FMT_CONF_MAIN    0  /* Main format */
 #define FMT_CONF_KEY     1  /* Kafka key format */
-#define FMT_CONF_NUM     2
+#define FMT_CONF_TOPIC   2  /* Kafka topic format */
+#define FMT_CONF_NUM     3
 
 
 /**
@@ -98,6 +99,10 @@
        char    *key;
        size_t   key_len;
 
+       /* Rendered FMT_CONF_TOPIC for use in _MAIN output func */
+       char    *topic;
+       size_t   topic_len;
+
        /* Auxillery buffers (if scratch pad is not sufficient) */
        struct tmpbuf *tmpbuf;
 

-- 
To view, visit https://gerrit.wikimedia.org/r/230173
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I903494cd9e7dfd39fac622a05d6a07f62b365b8b
Gerrit-PatchSet: 1
Gerrit-Project: operations/software/varnish/varnishkafka
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to