Ottomata has submitted this change and it was merged.

Change subject: Writing JSON statistics to log file rather than syslog or stderr
......................................................................


Writing JSON statistics to log file rather than syslog or stderr

syslog imposes a default limit of 2K on log lines.  Some
of these JSON formatted statistics lines can be larger than
that.  I am seeing lines over 3K.

Rather than try to use syslog for statistics reporting, we now
write directly to a log file.  The lines that are written there
are now fully valid JSON.  Each line may be parsed as its own
JSON object, without any extra non-JSON metadata prepending it.
(syslog prepends facility, timestamp, etc.).

Log rotation is handled via reopening files on SIGHUP.

The logfile defaults to /var/cache/varnishkafka.stats.json, and
is configurable via the log.statistics.file property.

This also adds a 'time' entry for varnishkafka stats.

Change-Id: I501fe6b421325ae98e21f910a7ae367762075371
---
M config.c
M varnishkafka.c
M varnishkafka.conf.example
M varnishkafka.h
4 files changed, 118 insertions(+), 14 deletions(-)

Approvals:
  Ottomata: Verified; Looks good to me, approved
  Edenhill: Looks good to me, but someone else must approve



diff --git a/config.c b/config.c
index 6b10b41..d06edd5 100644
--- a/config.c
+++ b/config.c
@@ -161,7 +161,10 @@
                        conf.log_to &= ~VK_LOG_SYSLOG;
        } else if (!strcmp(name, "log.kafka.msg.error"))
                conf.log_kafka_msg_error = conf_tof(val);
-       else if (!strcmp(name, "log.statistics.interval"))
+       else if (!strcmp(name, "log.statistics.file")) {
+               free(conf.stats_file);
+               conf.stats_file = strdup(val);
+       } else if (!strcmp(name, "log.statistics.interval"))
                conf.stats_interval = atoi(val);
        else if (!strcmp(name, "log.rate.max"))
                conf.log_rate = atoi(val);
diff --git a/varnishkafka.c b/varnishkafka.c
index 32689eb..373931d 100644
--- a/varnishkafka.c
+++ b/varnishkafka.c
@@ -73,7 +73,6 @@
        [FMT_CONF_KEY]  = "Key"
 };
 
-
 /**
  * Logline cache
  */
@@ -87,6 +86,7 @@
 
 static int logline_cnt = 0; /* Current number of loglines in memory */
 
+static void logrotate(void);
 
 /**
  * Counters
@@ -102,8 +102,8 @@
 } cnt;
 
 static void print_stats (void) {
-       vk_log("\"stats\"", LOG_INFO,
-              "{ "
+       vk_log_stats("{ \"varnishkafka\": { "
+              "\"time\":%llu, "
               "\"tx\":%"PRIu64", "
               "\"txerr\":%"PRIu64", "
               "\"kafka_drerr\":%"PRIu64", "
@@ -112,7 +112,8 @@
               "\"scratch_tmpbufs\":%"PRIu64", "
               "\"lp_curr\":%i, "
               "\"seq\":%"PRIu64" "
-              "}",
+              "} }\n",
+              (unsigned long long)time(NULL),
               cnt.tx,
               cnt.txerr,
               cnt.kafka_drerr,
@@ -1357,7 +1358,7 @@
  */
 static int kafka_stats_cb (rd_kafka_t *rk, char *json, size_t json_len,
                            void *opaque) {
-       vk_log("\"kafkastats\"", LOG_INFO, "%.*s", (int)json_len, json);
+       vk_log_stats("{ \"kafka\": %s }\n", json);
        return 0;
 }
 
@@ -1729,10 +1730,14 @@
                rate_limiters_rollover(lp->t_last);
 
        /* Stats output */
-       if (unlikely(conf.stats_interval &&
-                    lp->t_last >= conf.t_last_stats + conf.stats_interval)) {
-               print_stats();
-               conf.t_last_stats = lp->t_last;
+       if (conf.stats_interval) {
+               if (unlikely(conf.need_logrotate)) {
+                       logrotate();
+               }
+               if (unlikely(lp->t_last >= conf.t_last_stats + 
conf.stats_interval)) {
+                       print_stats();
+                       conf.t_last_stats = lp->t_last;
+               }
        }
 
        return conf.pret;
@@ -1759,6 +1764,70 @@
 
        if (conf.log_to & VK_LOG_STDERR)
                fprintf(stderr, "%%%i %s: %s\n", level, facility, buf);
+}
+
+/**
+ * Appends a formatted string to the conf.stats_fp file.
+ * conf.stats_fp is configured using the log.statistics.file property.
+ */
+void vk_log_stats(const char *fmt, ...) {
+       va_list ap;
+
+       /* If stats_fp is 0, vk_log_stats() shouldn't have been called */
+       if (!conf.stats_fp)
+               return;
+
+       /* Check if stats_fp needs rotating.
+          This will usually already be taken care
+          of by the check in parse_tag, but we
+          do it here just in case we need to rotate
+          and someone else called vk_log_stats,
+          e.g. kafka_stats_cb.
+       */
+       if (unlikely(conf.need_logrotate)) {
+               logrotate();
+       }
+
+       va_start(ap, fmt);
+       vfprintf(conf.stats_fp, fmt, ap);
+       va_end(ap);
+
+       /* flush stats_fp to make sure valid JSON data
+          (e.g. full lines with closing object brackets)
+       is written to disk */
+       if (fflush(conf.stats_fp)) {
+               vk_log("STATS", LOG_ERR,
+                       "Failed to fflush log.statistics.file %s: %s",
+                       conf.stats_file, strerror(errno));
+       }
+}
+
+/**
+ * Closes and reopens any open logging file pointers.
+ * This should be called not from the SIGHUP handler, but
+ * instead from somewhere in a main execution loop.
+ */
+static void logrotate(void) {
+       fclose(conf.stats_fp);
+
+       if (!(conf.stats_fp = fopen(conf.stats_file, "a"))) {
+               vk_log("STATS", LOG_ERR,
+                       "Failed to reopen log.statistics.file %s after 
logrotate: %s",
+                       conf.stats_file, strerror(errno));
+       }
+
+       conf.need_logrotate = 0;
+}
+
+/**
+ * Hangup signal handler.
+ * Sets the global logratate variable to 1 to indicate
+ * that any open file handles should be closed and reopened
+ * as soon as possible.
+ *
+ */
+static void sig_hup(int sig) {
+       conf.need_logrotate = 1;
 }
 
 
@@ -1827,6 +1896,7 @@
        conf.loglines_hmax  = 5;
        conf.scratch_size   = 4096;
        conf.stats_interval = 60;
+       conf.stats_file     = strdup("/var/cache/varnishkafka.stats.json");
        conf.log_kafka_msg_error = 1;
        conf.rk_conf = rd_kafka_conf_new();
        rd_kafka_conf_set(conf.rk_conf, "client.id", "varnishkafka", NULL, 0);
@@ -1894,11 +1964,21 @@
        /* Set up statistics gathering in librdkafka, if enabled. */
        if (conf.stats_interval) {
                char tmp[30];
+
+               if (!(conf.stats_fp = fopen(conf.stats_file, "a"))) {
+                       fprintf(stderr, "Failed to open statistics log file %s: 
%s\n",
+                               conf.stats_file, strerror(errno));
+                       exit(1);
+               }
+
                snprintf(tmp, sizeof(tmp), "%i", conf.stats_interval*1000);
                rd_kafka_conf_set_stats_cb(conf.rk_conf, kafka_stats_cb);
                rd_kafka_conf_set(conf.rk_conf, "statistics.interval.ms", tmp,
                                  NULL, 0);
-       }
+
+               /* Install SIGHUP handler for logrotating stats_fp. */
+               signal(SIGHUP, sig_hup);
+}
 
 
        /* Termination signal handlers */
@@ -2015,6 +2095,13 @@
        loglines_term();
        print_stats();
 
+       /* if stats_fp is set (i.e. open), close it. */
+       if (conf.stats_fp) {
+               fclose(conf.stats_fp);
+               conf.stats_fp = NULL;
+       }
+       free(conf.stats_file);
+
        rate_limiters_rollover(time(NULL));
 
        VSM_Close(vd);
diff --git a/varnishkafka.conf.example b/varnishkafka.conf.example
index b79155c..6318018 100644
--- a/varnishkafka.conf.example
+++ b/varnishkafka.conf.example
@@ -176,14 +176,22 @@
 # Kafka: log message delivery failures (requires required.acks > 0)
 log.kafka.msg.error = true
 
+#
+# JSON Statistics
+#
+# Statistics is collected from varnishkafka itself as well as librdkafka
+# Each JSON object has a top level key of either 'varnishkafka' or
+# 'kafka' to indicate which type of statistics the object contains.
+# Each line is a valid JSON object.
+#
 
 # Statistics output interval
-# Statistics is collected from varnishkafka itself as well as librdkafka
-# and output as a log message of level LOG_INFO containing a JSON object
-# prefixed with either "STATS: " or "KAFKASTATS: ".
 # Defaults to 60 seconds, use 0 to disable.
 #log.statistics.interval = 60
 
+# Statistics output file
+# Defaults to /var/cache/varnishkafka.stats.json
+#log.statistics.file = /var/cache/varnishkafka.stats.json
 
 
 # daemonize varnishkafka (boolean)
diff --git a/varnishkafka.h b/varnishkafka.h
index dd90806..da46a4e 100644
--- a/varnishkafka.h
+++ b/varnishkafka.h
@@ -191,7 +191,11 @@
                                      * truncating it. */
 
        int         stats_interval;  /* Statistics output interval */
+       char       *stats_file;      /* Statistics output log file */
+       FILE       *stats_fp;        /* Statistics file pointer    */
        time_t      t_last_stats;    /* Last stats output */
+
+       int         need_logrotate;  /* If this is 1, log files will be 
reopened */
 
        /* Kafka config */
        int         partition;
@@ -230,6 +234,8 @@
 
 #define _DBG(fmt...) vk_log("DEBUG", LOG_DEBUG, fmt)
 
+void vk_log_stats(const char *fmt, ...)
+       __attribute__((format (printf, 1, 2)));
 
 void out_kafka (struct fmt_conf *fconf, struct logline *lp,
                const char *buf, size_t len);

-- 
To view, visit https://gerrit.wikimedia.org/r/95473
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I501fe6b421325ae98e21f910a7ae367762075371
Gerrit-PatchSet: 7
Gerrit-Project: operations/software/varnish/varnishkafka
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>
Gerrit-Reviewer: Edenhill <[email protected]>
Gerrit-Reviewer: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to