Mobrovac has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/328660 )

Change subject: RESTBase-Cassandra: Add the topk reporter
......................................................................

RESTBase-Cassandra: Add the topk reporter

We have a reporter script running on terbium that collects information
from Logstash on partition sizes in RB's Cassandra and sends out a mail
to servi...@wm.org every 7 days. So far, this has been running under
Eric's user there. This patch properly puppetises the script and sets up
the environment for it to run every Sunday.

Beside the Cassandra reporter modules, this patch also adds base::crond,
a simple define abstracting out cron job configuration files. The define
is pretty flexible in terms of the how one can specify the time when to
run, as well as the command's environment and stream redirection.

Bug: T147366
Change-Id: I81eff560e424587c6ab4370bdb0cd60940c6302c
---
M manifests/site.pp
A modules/base/manifests/crond.pp
A modules/base/templates/crond.erb
A modules/cassandra/files/reporter/topk.py
A modules/cassandra/manifests/reporter/init.pp
A modules/cassandra/manifests/reporter/topk.pp
A modules/cassandra/templates/reporter/topk.erb
A modules/restbase/manifests/cass_report.pp
8 files changed, 732 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/puppet 
refs/changes/60/328660/1

diff --git a/manifests/site.pp b/manifests/site.pp
index cdfddde..b50bc16 100644
--- a/manifests/site.pp
+++ b/manifests/site.pp
@@ -2776,6 +2776,7 @@
     role(mariadb::maintenance, mediawiki::maintenance, openldap::management)
     include role::noc::site
     include ldap::role::client::labs
+    include restbase::cass_report
     include base::firewall
 
     interface::add_ip6_mapped { 'main':
diff --git a/modules/base/manifests/crond.pp b/modules/base/manifests/crond.pp
new file mode 100644
index 0000000..84b2b8e
--- /dev/null
+++ b/modules/base/manifests/crond.pp
@@ -0,0 +1,71 @@
+# == Define: base::crond
+#
+# The base::crond define is used to create cron configuration files. A
+# configuration file is stored in /etc/cron.d/ and is compiled from the
+# parameters given to the define. It allows one to set the command, multiple
+# execution times, the user to execute as, the environment variables to set and
+# where to redirect stdout and stderr.
+#
+# === Parameters
+#
+# [*title*]
+#   Required. Defines the name of the cron job to add. It is used as the name 
of
+#   the file under /etc/cron.d/
+#
+# [*command*]
+#   Required. The command to execute.
+#
+# [*time*]
+#   Required. The cron time definition(s) of when to execute the command. If an
+#   array is given, it will be interpreted as the list of time definitions, and
+#   a line in the cron configuration file will be added for each such cron 
time.
+#   The parameter (or each element of the array) can be either a string, array
+#   or hash. Strings are taken as being in the crontab time format and will be
+#   put in the file as-are. In the case of a hash, the following fields can
+#   exist: 'mm', 'hh', 'day', 'mon', 'dow'. They are read in the presented
+#   order. If any of them is missing, '*' is assumed. Finally, arrays are taken
+#   to be ordered times for each segment, i.e. the array ought to have 5
+#   elements; the missing ones are filled with '*'. For example, to indicate
+#   that the command should run at 20:30 every day as well as during reboot, 
one
+#   can use: ['@reboot', { mm => 30, hh => 20 }]. Alternatively, the second
+#   element could have been written as '50 20 * * *' or [30, 20]. Note, 
however,
+#   that that implies that if you want to run the command only at 20:30 and use
+#   an array, you have to specify [[30, 20]].
+#
+# [*user*]
+#   The user under which the command should run. Default: 'root'
+#
+# [*environment*]
+#   The environment variables to set. It can be a string (containing one or 
more
+#   NAME=VALUE pairs separated by new-line characters) or a hash (having
+#   variable names as keys and values as values). Note that $PATH is set by the
+#   define to /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/usr/local/bin . If
+#   you need to override it, set it in this parameter. Default: {}
+#
+# [*redirect*]
+#   Tells cron where to redict the output from standard output and error. It 
can
+#   be given as a string or as an array. In the former case, or if the array 
has
+#   only one element, both steams will be redirected to the same destination. 
If
+#   you want to redirect each stream separately, specify it as a two-element
+#   array in the [stdout, stderr] format. For example,
+#   ['/var/log/my-cron.out', '/var/log/my-cron.err'] would send standard output
+#   to the first and standard error to the second element of the array. The
+#   default behaviour is to send everything to /dev/null. Default: 
['/dev/null']
+#
+define base::crond(
+    $command,
+    $time,
+    $user        = 'root',
+    $environment = {},
+    $redirect    = ['/dev/null'],
+) {
+
+    file { "/etc/cron.d/${title}":
+        ensure  => present,
+        content => template('base/crond.erb'),
+        owner   => 'root',
+        group   => 'root',
+        mode    => '0644'
+    }
+
+}
diff --git a/modules/base/templates/crond.erb b/modules/base/templates/crond.erb
new file mode 100644
index 0000000..1a83e5a
--- /dev/null
+++ b/modules/base/templates/crond.erb
@@ -0,0 +1,50 @@
+<%-
+  # compile the environment
+  env = if @environment.kind_of?(Hash)
+    @environment.map do |key, value|
+      "#{key[0] == '$' ? '' : '$'}#{key}#{key}=#{value}"
+    end.join("\n")
+  else
+    @environment.strip
+  end
+  # figure out the redirects
+  redir = if @redirect.kind_of?(String)
+    [@redirect.strip, '&1']
+  elsif @redirect.kind_of?(Array)
+    unless @redirect.empty?
+      @redirect[0, 2].size == 2 ? @redirect[0, 2] : [@redirect[0], '&1']
+    else
+      ['/dev/null', '&1']
+    end
+  else
+    ['/dev/null', '&1']
+  end
+  redir.each_index { |i| redir[i] = "#{i + 1}>#{redir[i]}" }
+  redir.push(redir.shift) if '1>&2' == redir[0]
+  # handle the supplied times
+  times = @time.kind_of?(Array) ? @time : [@time]
+  times.map! do |time|
+    if [String, Number, Symbol].include?(time.kind_of?)
+      [time.to_s.strip]
+    elsif time.kind_of?(Array)
+      (0...5).map { |i| time[i] || '*' }
+    elsif time.kind_of?(Hash)
+      ['mm', 'hh', 'day', 'mon', 'dow'].map { |f| time.has_key?(f) && time[f] 
|| '*' }
+    else
+      ['*'] * 5
+    end.join(' ')
+  end
+-%>
+# This file is managed by Puppet
+# Do not edit manually
+
+# Cron tab for <%= @title %>
+
+# Environment
+PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/usr/local/bin
+<%= env %>
+
+# Schedule
+<%- @time.each do |time| -%>
+<%= time %>    <%= @user %>    <%= @command.strip %>    <%= redir.join(' ') %>
+<%- end -%>
diff --git a/modules/cassandra/files/reporter/topk.py 
b/modules/cassandra/files/reporter/topk.py
new file mode 100755
index 0000000..c4f45e4
--- /dev/null
+++ b/modules/cassandra/files/reporter/topk.py
@@ -0,0 +1,465 @@
+#!/usr/bin/env python
+
+# -*- coding: utf-8 -*-
+
+# Copyright 2016 Eric Evans <eev...@wikimedia.org>, Wikimedia Foundation
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"""
+Cassandra topk wide partition reports
+"""
+
+
+import argparse
+import csv
+import datetime
+import getpass
+import json
+import operator
+import re
+import StringIO
+import socket
+import sys
+import time
+import logging
+
+from email.mime.text      import MIMEText
+from email.mime.base      import MIMEBase
+from email.mime.multipart import MIMEMultipart
+from subprocess           import Popen, PIPE
+
+try:
+    import jsonschema
+except ImportError:
+    print >>sys.stderr, "Missing jsonschema module (Hint: apt-get install 
python-jsonschema)"
+    sys.exit(1)
+
+try:
+    import requests
+except ImportError:
+    print >>sys.stderr, "Missing requests module (Hint: apt-get install 
python-requests)"
+    sys.exit(1)
+
+try:
+    from jinja2 import Template
+except ImportError:
+    print >>sys.stderr, "Missing jinja2 module (Hint: apt-get install 
python-jinja2)"
+    sys.exit(1)
+
+
+class ElasticSearch(object):
+    """Query elasticsearch."""
+    page_size = 200
+
+    def __init__(self, host, port=9200):
+        self.host = host
+        self.port = port
+
+    def __url(self, index):
+        return "http://{0.host}:{0.port}/{index}/_search".format(self, 
index=index)
+
+    def search(self, index, query):
+        """
+        Executes an elasticsearch query of the given index; Returns a generator
+        of objects from the hits array of the response.
+        """
+        if not isinstance(query, dict):
+            raise RuntimeError("invalid argument; query must be a dictionary")
+        query["from"] = 0
+        query["size"] = ElasticSearch.page_size
+        res = Page(requests.get(self.__url(index), data=json.dumps(query)))
+        for hit in res.hits:
+            yield hit
+        for _ in range((res.total / ElasticSearch.page_size) + 1):
+            query["from"] += len(res.hits)
+            res = Page(requests.get(self.__url(index), data=json.dumps(query)))
+            for hit in res.hits:
+                yield hit
+
+class Page(object):
+    """
+    Encapsulates a page of data as returned from elasticsearch.
+    """
+    response_schema = {
+        "type": "object",
+        "properties": {
+            "timed_out": {
+                "type": "boolean"
+            },
+            "hits": {
+                "type": "object",
+                "properties": {
+                    "hits": {
+                        "type": "array"
+                    }
+                },
+                "required": ["hits"]
+            }
+        },
+        "required": ["timed_out", "hits"]
+    }
+
+    def __init__(self, response):
+        json_obj = Page.validate_response(response)
+        self._total = json_obj["hits"]["total"]
+        self._hits = json_obj["hits"]["hits"]
+
+    @property
+    def total(self):
+        """
+        Total number of results returned by query.
+        """
+        return self._total
+
+    @property
+    def hits(self):
+        """
+        Number of results in this page.
+        """
+        return self._hits
+
+    @classmethod
+    def validate_response(cls, response):
+        """
+        Validates the return status and JSON payload of a response returned by
+        the requests module.
+        """
+        if not hasattr(response, "status_code"):
+            raise Exception("invalid response object")
+        if response.status_code != 200:
+            raise Exception("elasticsearch returned status {0.status_code}: 
{1}".format(response, response.json()))
+        json_obj = response.json()
+        jsonschema.validate(json_obj, Page.response_schema)
+        return json_obj
+
+def search_query(cluster="eqiad"):
+    """
+    Returns the JSON-based elasticsearch query for Cassandra large partition
+    warnings.
+    """
+    return {
+        "query": {
+            "bool": {
+                "must": [
+                    {"match": {"logger_name": 
"org.apache.cassandra.io.sstable.format.big.BigTableWriter"}},
+                    {"match": {"cluster": cluster}},
+                    {"match_phrase": {"message": "Writing large partition"}}
+                ]
+            }
+        },
+        "_source": ["message"]
+    }
+
+def index_names(days=7):
+    """
+    Returns a generator of logstash index names for the past N days.
+    """
+    name = lambda dt: dt.strftime("logstash-%Y.%m.%d")
+    now = datetime.datetime.now()
+    yield name(now)
+    for i in range(days):
+        yield name(now - datetime.timedelta(days=i+1))
+
+class EmailMessage(object):
+    """
+    An email report of the topk wide partition results.
+    """
+    text = """
+    <html>
+      <head>
+        <style>
+          th, td {
+            text-align: left;
+            padding: 0 1em 0 1em;
+          }
+          table {
+            border-collapse: collapse;
+          }
+          table, th, td {
+            border: 1px solid black;
+          }
+          p.notice {
+            color: #777;
+            font-size: 90%;
+          }
+        </style>
+      </head>
+      <body>
+        <p>
+          Respected Humans,
+        </p>
+        <p>
+          Attached is a report of the top {{results|length}} widest partitions 
for the Cassandra cluster named
+          <i>{{cluster_name}}</i>.
+        </p>
+
+        <table>
+          <tr>
+            <th>Size</th>
+            <th>Partition</th>
+          <tr>
+        {% for result in results %}
+          <tr>
+            <td>{{result[1]}}</td>
+            <td>{{result[0]}}</td>
+          </tr>
+        {% endfor %}
+        </table>
+
+        <h2>About this report:</h2>
+        <p>
+          Cassandra logs a warning whenever background compaction encounters a 
parition larger than the value of
+          <a 
href="http://cassandra.apache.org/doc/latest/configuration/cassandra_config_file.html#compaction-large-partition-warning-threshold-mb";>
+            <code>compaction_large_partition_warning_threshold_mb</code>
+          </a>.
+          This report is periodically generated from an Elasticsearch query of 
these log messages, and emailed to
+          {{email_address}}.
+        </p>
+
+        <table>
+          <tr>
+            <th>Execution host</th>
+            <td>{{execution_host}}</td>
+          </tr>
+          <tr>
+            <th>Logstash host</th>
+            <td>{{logstash_host}}</td>
+          </tr>
+          <tr>
+            <th>Execution time</th>
+            <td>{{execution_time}} secs</td>
+          </tr>
+          <tr>
+            <th>Query results</th>
+            <td>{{total_query_results}}</td>
+          </tr>
+          <tr>
+            <th>Unique partitions</th>
+            <td>{{unique_query_results}}</td>
+          </tr>
+          <tr>
+            <th>Cassandra cluster name</th>
+            <td>{{cluster_name}}</td>
+          </tr>
+          <tr>
+            <th>Source code</th>
+            <td>https://github.com/eevans/services-adhoc-reports</td>
+          </tr>
+        </table>
+      </body>
+    </html>
+    """
+
+    def __init__(self, **kwargs):
+        def __check_required_kwarg(kwarg, default=None):
+            value = kwargs.get(kwarg, default)
+            if not value:
+                raise Exception("missing keyword argument: {}".format(kwarg))
+            return value
+
+        self.message = MIMEMultipart("alternative")
+        self.message["Subject"] = __check_required_kwarg("subject")
+        self.message["To"] = __check_required_kwarg("email_address")
+        self.message["From"] = __check_required_kwarg("message_from")
+
+        template_vars = {}
+        template_vars["cluster_name"] = __check_required_kwarg("cluster_name")
+        template_vars["logstash_host"] = 
__check_required_kwarg("logstash_host")
+        template_vars["email_address"] = self.message["To"]
+        template_vars["results"] = __check_required_kwarg("results")
+        template_vars["execution_host"] = 
__check_required_kwarg("execution_host")
+        template_vars["execution_time"] = 
__check_required_kwarg("execution_time")
+        template_vars["total_query_results"] = 
__check_required_kwarg("total_query_results")
+        template_vars["unique_query_results"] = 
__check_required_kwarg("unique_query_results")
+
+        if not isinstance(template_vars["results"], list):
+            raise Exception("invalid argument for keyword results (not a 
list)")
+
+        self.template = Template(EmailMessage.text)
+        
self.message.attach(MIMEText(self.template.render(template_vars).encode("utf-8"),
 "html"))
+
+    def send(self):
+        """
+        Deliver this report.
+        """
+        proc = Popen(["/usr/sbin/exim4", "-i", self.message["To"]], 
stdout=PIPE, stdin=PIPE, stderr=PIPE)
+        proc.communicate(input=self.message.as_string())
+
+    def attach(self, message):
+        """
+        Add an attachment to this report.
+        """
+        if not isinstance(message, MIMEBase):
+            raise Exception("invalid message")
+        self.message.attach(message)
+
+def write_csv(stream, results):
+    """
+    Writes the results in CSV format to the specified stream
+    """
+    csv_writer = csv.writer(stream)
+    for res in results:
+        csv_writer.writerow([res[0].encode("utf-8"), res[1]])
+
+def csv_attachement(results):
+    """
+    Return an email attachment of a csv file containing results.
+    """
+    csv_fp = StringIO.StringIO()
+    write_csv(csv_fp, results)
+    csv_msg = MIMEText(csv_fp.getvalue(), "csv")
+    csv_msg.add_header("Content-Disposition", "attachment", 
filename="report.csv")
+    return csv_msg
+
+def local_hostname():
+    """
+    Returns the fully-qualified hostname of this current machine.
+    """
+    return socket.gethostbyaddr(socket.gethostname())[0]
+
+def subject(days):
+    """
+    Returns a formated email subject line.
+    """
+    now = datetime.datetime.now()
+    date_to = now.strftime("%Y-%m-%d")
+    date_from = (now - datetime.timedelta(days=days)).strftime("%Y-%m-%d")
+    return "Cassandra wide partition report: period of {} to 
{}".format(date_from, date_to)
+
+def strfsize(num, suffix='B'):
+    """
+    Formats a size as a human-readable string.
+    """
+    for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
+        if abs(num) < 1024.0:
+            return "%.1f%s%s" % (num, unit, suffix)
+        num /= 1024.0
+    return "%.1f%s%s" % (num, 'Yi', suffix)
+
+def pretty_print(values):
+    """
+    Writes the report as a pretty-formatted list on stdout
+    """
+    header = ['Size', 'Partition', 'Size (in bytes)']
+    size = [len(x) for x in header]
+    results = []
+    # format the values and find out the largest element in each column
+    for val in values:
+        tup = (strfsize(val[1]), val[0], str(val[1]))
+        for idx, slen in enumerate(size):
+            if len(tup[idx]) > slen:
+                size[idx] = len(tup[idx])
+        results.append(tup)
+    results.insert(0, header)
+    # adjust the width of each cell
+    results = [[res[idx].ljust(s) for idx, s in enumerate(size)] for res in 
results]
+    # print it out
+    for res in results:
+        sys.stdout.write('%s  %s  %s\n' % (res[0], res[1], res[2]))
+
+def send_email(values, args, count, exec_start_time):
+    """
+    Create and send the report in an e-mail
+    """
+    logging.warning("Sending email report...")
+    # Craft an html email message report.
+    email = EmailMessage(
+        results=[(i[0], strfsize(i[1])) for i in values[:args.top]],
+        subject=subject(args.last_days),
+        email_address=args.email,
+        message_from="{}@{}".format(getpass.getuser(), local_hostname()),
+        cluster_name=args.cluster,
+        logstash_host=args.logstash_host,
+        execution_host=local_hostname(),
+        execution_time=time.time()-exec_start_time,
+        total_query_results=count,
+        unique_query_results=len(values)
+    )
+    email.attach(csv_attachement(values[:args.top]))    # Attach a csv file.
+    email.send()                                        # Deliver.
+    logging.warning("Done.")
+
+def parse_arguments():
+    """
+    Parse command-line arguments.
+    """
+    parser = argparse.ArgumentParser(
+        description="Generate an email report of the widest Cassandra 
partitions.")
+    parser.add_argument("-e", "--email", metavar="ADDRESS", default='', 
help="Email the report to the specified address")
+    parser.add_argument('-s', '--csv', action='store_true', help='Output a 
comma-delimited report to stdout')
+    parser.add_argument(
+        "-H", "--logstash-host",
+        metavar="HOST",
+        default="logstash1001.eqiad.wmnet",
+        help="Logstash hostname or address to search")
+    parser.add_argument("-p", "--logstash-port", metavar="PORT", type=int, 
default=9200, help="Logstash port number")
+    parser.add_argument("-k", "--top", metavar="N", type=int, default=50, 
help="Number of results to report")
+    parser.add_argument("-c", "--cluster", metavar="NAME", default="eqiad", 
help="Cassandra cluster name")
+    parser.add_argument(
+        "-d", "--last-days", metavar="DAYS", default=7, type=int, help="Past 
number of days to report on")
+    return parser.parse_args()
+
+def main():
+    """
+    Queries elasticsearch for matching records from all applicable daily 
indices.
+    Generates an email report of the topk matching results and delivers it to
+    the requested email address.
+    """
+    exec_start_time = time.time()
+    args = parse_arguments()
+    count = 0
+    results = {}
+    log_message_re = re.compile(r"Writing large partition (?P<partition>.+) 
\((?P<bytes>[\d]+) bytes\)")
+    logstash = ElasticSearch(args.logstash_host, args.logstash_port)
+
+    # Query each daily logstash index, for the last N days.
+    for index in index_names(args.last_days):
+        logging.warning("Querying elasticsearch index: %s" % index)
+        try:
+            # Collate the returned results.
+            for hit in logstash.search(index, search_query(args.cluster)):
+                count += 1
+                message = hit["_source"]["message"]
+                match = log_message_re.match(message)
+                if not match:
+                    logging.error("Result did not match regex 
(\"{}\")".format(message))
+                    continue
+                partition = match.group("partition")
+                size = match.group("bytes")
+                if results.get(partition, 0) < size:
+                    results[partition] = int(size)
+        # TODO: An exception is thrown when we page past 
`index.max_result_window` number of results
+        # (an elasticsearch setting; 10000 in our environment).  The solution 
seems to be to use the
+        # scroll API instead 
(https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html).
+        except Exception, err:
+            logging.error("Query of index {} failed: {}".format(index, err))
+
+    # Sort results by partition size, descending.
+    values = sorted(results.items(), key=operator.itemgetter(1), reverse=True)
+
+    # decide on the action to perform
+    if len(args.email):
+        # send the e-mail
+        send_email(values, args, count, exec_start_time)
+    elif args.csv:
+        # print the CSV on stdout
+        write_csv(sys.stdout, values[:args.top])
+    else:
+        # pretty-print the results
+        pretty_print(values[:args.top])
+
+if __name__ == "__main__":
+    logging.basicConfig(stream=sys.stderr, level=logging.WARNING)
+    main()
diff --git a/modules/cassandra/manifests/reporter/init.pp 
b/modules/cassandra/manifests/reporter/init.pp
new file mode 100644
index 0000000..dc2c679
--- /dev/null
+++ b/modules/cassandra/manifests/reporter/init.pp
@@ -0,0 +1,35 @@
+# == Class: cassandra::reporter
+#
+# Class used for the configuration of reporters and setting up the log
+# directory.
+#
+# === Parameters
+#
+# [*logstash_es_host*]
+#   The ElasticSearch server hosting the logs.
+#   Default: 'logstash1001.eqiad.wmnet'
+#
+# [*logstash_es_port*]
+#   The port ES is listening on. Default: 9200
+#
+# [*log_dir*]
+#   The log directory where to write the output of the reporting scripts.
+#   The directory is created if it does not exist.
+#   Default: '/var/log/cassandra-reports'
+#
+class cassandra::reporter(
+    $logstash_es_host = 'logstash1001.eqiad.wmnet',
+    $logstash_es_port = 9200,
+    $log_dir          = '/var/log/cassandra-reports',
+) {
+
+    if !defined(File[$log_dir]) {
+        file { $log_dir:
+            ensure => directory,
+            owner  => 'root',
+            group  => 'root',
+            mode   => '0755',
+        }
+    }
+
+}
diff --git a/modules/cassandra/manifests/reporter/topk.pp 
b/modules/cassandra/manifests/reporter/topk.pp
new file mode 100644
index 0000000..ccb9c16
--- /dev/null
+++ b/modules/cassandra/manifests/reporter/topk.pp
@@ -0,0 +1,91 @@
+# == Define: cassandra::reporter::topk
+#
+# Sets up the topk Cassandra reporter, which reports the topk partitions
+# measured by size for the Cassandra cluster given as the title of the define.
+#
+# === Parameters
+#
+# [*title*]
+#   Required. The name of the Cassandra cluster for which to report.
+#
+# [*email*]
+#   The e-mail address to send the report to. Default: 'servi...@wikimedia.org'
+#
+# [*no_results*]
+#   The number of results to display on the report. Default: 50
+#
+# [*no_days*]
+#   The interval, in days, between two reports. Apart from a number, one can
+#   also use the special aliases 'daily' (every day) and 'weekly' (every
+#   Sunday). The reports are always generated at 23:50. If you just want to
+#   install the script that generates the report (without setting up cron), set
+#   this parameter to 0 or undef. Default: 'weekly'
+#
+define cassandra::reporter::topk(
+    $email            = 'servi...@wikimedia.org',
+    $no_results       = 50,
+    $no_days          = 'weekly',
+) {
+
+    unless $title and size($title) > 0 {
+        fail('Name of the Cassandra cluster (given as ttile) is required!')
+    }
+
+    # figure out the cron job time definition
+    if !defined($no_days) or $no_days == 0 {
+        $cron_time = undef
+    } elsif $no_days == 'daily' {
+        $cron_time = '50 23 * * *'
+        $last_days = 1
+    } elsif $no_days == 'weekly' {
+        $cron_time = '50 23 * * 0'
+        $last_days = 7
+    } else {
+        $cron_time = "50 23 */${no_days} * *"
+        $last_days = $no_days
+    }
+
+    # install the script
+    $bin = '/usr/local/bin/cassandra-reporter-topk'
+    if !defined(File[$bin]) {
+        file { $bin:
+            ensure => present,
+            owner  => 'root',
+            group  => 'root',
+            mode   => '0755',
+            source => 'puppet:///modules/cassandra/reporter/topk.py',
+        }
+    }
+
+    # set up the cron job only if a time definition has been set
+    if defined($cron_time) {
+        # load the configuration and ensure the log dir exists
+        require ::cassandra::reporter
+        $logstash_es_host = $::cassandra::reporter::logstash_es_host
+        $logstash_es_port = $::cassandra::reporter::logstash_es_port
+        $log_file = "${::cassandra::reporter::log_dir}/${title}.topk"
+        # make sure the log files are readable
+        file { "${log_file}.out":
+           ensure  => present,
+           owner   => 'root',
+           group   => 'root',
+           mode    => '0644',
+           replace => false,
+        }
+        file { "${log_file}.err":
+           ensure  => present,
+           owner   => 'root',
+           group   => 'root',
+           mode    => '0644',
+           replace => false,
+        }
+        # install the cron job
+        base::crond { "cassandra-reporter-topk-${title}":
+            command  => template('cassandra/reporter/topk.erb'),
+            time     => $cron_time,
+            redirect => ["${log_file}.out", "${log_file}.err"],
+            require  => File[$bin, "${log_file}.out", "${log_file}.err"],
+        }
+    }
+
+}
diff --git a/modules/cassandra/templates/reporter/topk.erb 
b/modules/cassandra/templates/reporter/topk.erb
new file mode 100644
index 0000000..989e8ac
--- /dev/null
+++ b/modules/cassandra/templates/reporter/topk.erb
@@ -0,0 +1 @@
+<%= @bin %> --email <%= @email %> --cluster <%= @title %> --top <%= 
@no_results %> --last-days <%= @last_days %> --logstash_host <%= 
@logstash_es_host %> --logstash_port <%= @logstash_es_port %>
diff --git a/modules/restbase/manifests/cass_report.pp 
b/modules/restbase/manifests/cass_report.pp
new file mode 100644
index 0000000..7a82b76
--- /dev/null
+++ b/modules/restbase/manifests/cass_report.pp
@@ -0,0 +1,18 @@
+# == Class: restbase::cass_report
+#
+# Sets up the host to be a reporter for RESTBase's Cassandra. So far, it only
+# sets up the topk-partition reporter.
+#
+# === Parameters
+#
+# [*cluster_name*]
+#   The logical name of RESTBase's Cassandra cluster. Default: 'eqiad'
+#
+class restbase::cass_report(
+    $cluster_name = 'eqiad',
+) {
+
+    # set up the topk reporter weekly, sending the report by email to
+    # servi...@wikimedia.org
+    cassandra::reporter::topk { $cluster_name: }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/328660
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I81eff560e424587c6ab4370bdb0cd60940c6302c
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Mobrovac <mobro...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to