Jcrespo has uploaded a new change for review. (
https://gerrit.wikimedia.org/r/369397 )
Change subject: mariadb: Add new python3 script to check the health of a server
......................................................................
mariadb: Add new python3 script to check the health of a server
Aside from porting checks from the old perl script, this one
has certain WMF-only extras like heartbeat monitoring; plus
extra monitoring parameters:
* Connection testing
* Version
* Uptime
* Read_only variable checking
* Lag
* Optional Replication monitoring, including errors
* Queries per second
* Connection latency
* Query latency
* Number of clients conencted
* Process monitoring, for localhost
* Timeout handling in case of network or server problems
* Non-blocking by default (doesn't depend on SHOW SLAVE STATUS)
* Monitoring time should only be a few miliseconds from localhost,
except if QPS wants to be calculated.
The script is not in the best shape- it has been done merging
the WIP WMFMariaDB administration framework and the check_health
script in a single file, for simplicity- but I think it is already
better than the perl script that is currently in place.
Bug: T171928
Change-Id: Id79f39cb149e2e14de4a42774a4defe2de97d919
---
A modules/mariadb/files/check_mariadb.py
M modules/mariadb/manifests/config.pp
2 files changed, 575 insertions(+), 0 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/operations/puppet
refs/changes/97/369397/1
diff --git a/modules/mariadb/files/check_mariadb.py
b/modules/mariadb/files/check_mariadb.py
new file mode 100755
index 0000000..8df954d
--- /dev/null
+++ b/modules/mariadb/files/check_mariadb.py
@@ -0,0 +1,567 @@
+#!/usr/bin/env python3
+
+import configparser
+import csv
+import os
+import glob
+import ipaddress
+# requires python3-pymysql
+import pymysql
+import re
+import socket
+import pprint
+import argparse
+import json
+import math
+import sys
+import time
+from datetime import datetime
+import subprocess
+
+class WMFMariaDB:
+ """
+ Wrapper class to connect to MariaDB instances within the Wikimedia
+ Foundation cluster. It simplifys all authentication methods by providing a
+ unique, clean way to do stuff on the databases.
+ """
+
+ connection = None
+ host = None
+ port = None
+ database = None
+ __last_error = None
+ __debug = False
+
+ @staticmethod
+ def get_credentials(host, port, database):
+ """
+ Given a database instance, return the authentication method, including
+ the user, password, socket and ssl configuration.
+ """
+ if host == 'localhost':
+ # connnect to localhost using plugin_auth:
+ config = configparser.ConfigParser(interpolation=None,
+ allow_no_value=True)
+ config.read('/etc/my.cnf')
+ if os.getuid() == 0:
+ user = 'root'
+ else:
+ user = os.getlogin()
+ if port == 3306:
+ mysql_sock = config['client']['socket']
+ else:
+ mysql_sock = '/run/mysqld/mysqld.s' + str(port)[-1:] + '.sock'
+ ssl = None
+ password = None
+ charset = None
+ elif not host.startswith('labsdb'):
+ # connect to a production remote host, use ssl
+ config = configparser.ConfigParser(interpolation=None,
+ allow_no_value=True)
+ config.read('/root/.my.cnf')
+ user = config['client']['user']
+ password = config['client']['password']
+ ssl = {'ca': '/etc/ssl/certs/Puppet_Internal_CA.pem'}
+ mysql_sock = None
+ charset = None
+ else:
+ # connect to a labs remote host, use ssl
+ config = configparser.ConfigParser(interpolation=None)
+ config.read('/root/.my.cnf')
+ user = config['labsdb']['user']
+ password = config['labsdb']['password']
+ if host.startswith('labsdb1001') or host.startswith('labsdb1003'):
+ ssl = None
+ else:
+ ssl = {'ca': '/etc/ssl/certs/Puppet_Internal_CA.pem'}
+ mysql_sock = None
+ charset = None
+
+ return (user, password, mysql_sock, ssl, charset)
+
+ @property
+ def debug(self):
+ """debug getter"""
+ return self.__debug
+
+ @debug.setter
+ def debug(self, debug):
+ """debug setter"""
+ if not debug:
+ self.__debug = False
+ else:
+ self.__debug = True
+
+ @property
+ def last_error(self):
+ """last_error getter"""
+ return self.__last_error
+
+ @staticmethod
+ def resolve(host):
+ """
+ Return the full qualified domain name for a database hostname. Normally
+ this return the hostname itself, except in the case where the
+ datacenter and network parts have been omitted, in which case, it is
+ completed as a best effort.
+ If the original address is an IPv4 or IPv6 address, leave it as is
+ """
+ try:
+ ipaddress.ip_address(host)
+ return host
+ except ValueError:
+ pass
+ if '.' not in host and host != 'localhost':
+ domain=''
+ if re.match('^[a-z]+1[0-9][0-9][0-9]$', host) is not None:
+ domain = '.eqiad.wmnet'
+ elif re.match('^[a-z]+2[0-9][0-9][0-9]$', host) is not None:
+ domain = '.codfw.wmnet'
+ elif re.match('^[a-z]+3[0-9][0-9][0-9]$', host) is not None:
+ domain = '.esams.wmnet'
+ elif re.match('^[a-z]+4[0-9][0-9][0-9]$', host) is not None:
+ domain = '.ulsfo.wmnet'
+ else:
+ localhost_fqdn = socket.getfqdn()
+ if '.' in localhost_fqdn and len(localhost_fqdn) > 1:
+ domain = localhost_fqdn[localhost_fqdn.index('.'):]
+ host = host + domain
+ return host
+
+ def __init__(self, host, port=3306, database=None, debug=False,
+ connect_timeout=10.0):
+ """
+ Try to connect to a mysql server instance and returns a python
+ connection identifier, which you can use to send one or more queries.
+ """
+
+ self.debug = debug
+ host = WMFMariaDB.resolve(host)
+ (user, password, socket, ssl, charset) = WMFMariaDB.get_credentials(
+ host, port, database)
+
+ if self.debug:
+ if host == 'localhost':
+ address = '{}[socket={}]'.format(host, socket)
+ else:
+ address = '{}:{}'.format(host, port)
+ print('Connecting to {}/{}'.format(address, database))
+ try:
+ self.connection = pymysql.connect(
+ host=host, port=port, user=user, password=password,
+ db=database, charset='utf8mb4', unix_socket=socket, ssl=ssl,
+ connect_timeout=connect_timeout)
+ except (pymysql.err.OperationalError, pymysql.err.InternalError,
FileNotFoundError) as e:
+ self.connection = None
+ self.__last_error = [e.args[0], e.args[1]]
+ if self.debug:
+ print('ERROR {}: {}'.format(e.args[0], e.args[1]))
+ self.host = host
+ self.port = int(port)
+ self.database = database
+ self.connect_timeout = connect_timeout
+
+ def change_database(self, database):
+ """
+ Changes the current database without having to disconnect and reconnect
+ """
+ # cursor = self.connection.cursor()
+ # cursor.execute('use `{}`'.format(database))
+ # cursor.close()
+ if self.connection is None:
+ print('ERROR: There is no connection active; could not change db')
+ return
+ try:
+ self.connection.select_db(database)
+ except (pymysql.err.OperationalError, pymysql.err.InternalError) as e:
+ self.__last_error = [e.args[0], e.args[1]]
+ if self.debug:
+ print('ERROR {}: {}'.format(e.args[0], e.args[1]))
+ return
+ self.database = database
+ if self.debug:
+ print('Changed database to \'{}\''.format(self.database))
+
+ def execute(self, command, dryrun=False):
+ """
+ Sends a single query to a previously connected server instance, returns
+ if that query was successful, and the rows read if it was a SELECT
+ """
+
+ # we are not connected, abort immediately
+ if self.connection is None:
+ return {"query": command, "host": self.host, "port": self.port,
+ "database": self.database, "success": False,
+ "errno": self.last_error[0], "errmsg": self.last_error[1]}
+ cursor = self.connection.cursor()
+ try:
+ if dryrun:
+ print(("We will *NOT* execute \'{}\' on {}:{}/{} because"
+ "this is a dry run.").format(
+ command, self.host, self.port, self.database))
+ cursor.execute('SELECT \'success\' as dryrun')
+ else:
+ if self.debug:
+ print('Executing \'{}\''.format(command))
+ cursor.execute(command)
+ except (pymysql.err.ProgrammingError, pymysql.err.OperationalError) as
e:
+ cursor.close()
+ query = command
+ host = self.host
+ port = self.port
+ database = self.database
+ self.__last_error = [e.args[0], e.args[1]]
+ if self.debug:
+ print('ERROR {}: {}'.format(e.args[0], e.args[1]))
+ return {"query": query, "host": host, "port": port,
+ "database": database, "success": False,
+ "errno": self.last_error[0], "errmsg": self.last_error[1]}
+
+ rows = None
+ fields = None
+ query = command
+ host = self.host
+ port = self.port
+ database = self.database
+ if cursor.rowcount > 0:
+ rows = cursor.fetchall()
+ fields = tuple([x[0] for x in cursor.description])
+ numrows = cursor.rowcount
+ cursor.close()
+
+ return {"query": query, "host": host, "port": port,
+ "database": database, "success": True, "numrows": numrows,
+ "rows": rows, "fields": fields}
+
+
+ def disconnect(self):
+ """
+ Ends the connection to a database, freeing resources. No more queries
+ will be able to be sent to this connection id after this is executed
+ until a new connection is open.
+ """
+ if self.debug:
+ print('Disconnecting from {}:{}/{}'.format(self.port, self.host,
+ self.database))
+ if self.connection is not None:
+ self.connection.close()
+ self.connection = None
+
+
+def parse_args():
+ """
+ Performs the parsing of execution parameters, and returns the object
+ containing them
+ """
+ parser = argparse.ArgumentParser(add_help=False)
+ parser.add_argument('--host', '-h', help="""the hostname or dns to connect
+ to.""", default='localhost')
+ parser.add_argument('--port', '-P', type=int, help='the port to connect',
+ default=3306)
+ parser.add_argument('--verbose', '-v', action='store_true', dest='debug',
+ help='Enable debug mode for execution trace.')
+ parser.add_argument('--slave-status', action='store_true',
dest='slave_status',
+ help='Enable SHOW SLAVE STATUS execution (blocking).')
+ parser.add_argument('--process', action='store_true', dest='process',
+ help='Return the list of processes running (only
available for localhost).')
+ parser.add_argument('--icinga', action='store_true', dest='icinga',
+ help='Output in icinga format rather than just the
status.')
+ parser.add_argument('--connect-timeout', type=float, default=1.0,
dest='connect_timeout',
+ help='How much time to wait for mysql to connect.')
+ parser.add_argument('--query-timeout', type=float, default=1.0,
dest='query_timeout',
+ help='Max execution query limit.')
+ parser.add_argument('--shard', default=None,
+ help='Only check this replication channel/heartbeat
row.')
+ parser.add_argument('--primary-dc', dest='primary_dc', default='eqiad',
+ help='Set primary datacenter (by default, eqiad).')
+ parser.add_argument('--check_read_only', dest='read_only', default=None,
+ help='Check read_only variable matches the given
value.')
+ parser.add_argument('--check_warn_lag', type=float, dest='warn_lag',
default=15.0,
+ help='Lag from which a Warning is returned. By
default, 15 seconds.')
+ parser.add_argument('--check_crit_lag', type=float, dest='crit_lag',
default=300.0,
+ help='Lag from which a Critical is returned. By
default, 300 seconds.')
+ parser.add_argument('--check_num_processes', type=int,
dest='num_processes', default=None,
+ help='Number of mysqld processes expected. Requires
--process')
+ parser.add_argument('--check_warn_connections', type=int,
dest='warn_connections', default=1000,
+ help='Lag from which a Warning is returned. By
default, 15 seconds.')
+ parser.add_argument('--check_crit_connections', type=int,
dest='crit_connections', default=4995,
+ help='Lag from which a Critical is returned. By
default, 300 seconds.')
+ parser.add_argument('--help', '-?', '-I', action='help',
+ help='show this help message and exit')
+ return parser
+
+
+def get_var(conn, name, scope='GLOBAL', type='VARIABLES'):
+ if scope in ['GLOBAL', 'SESSION'] and type in ['VARIABLES', 'STATUS']:
+ result = conn.execute("SHOW {} {} like '{}'".format(scope, type, name))
+ if result["success"]:
+ return result["rows"][0][1]
+ return None
+
+
+def get_replication_status(conn, connection_name=None):
+ """
+ Provides replication status information to the given host.
+ If connection_name is given, and such a named replication channel exists
+ (MariaDB only), it returns a dictionary with the specific connection
+ information.
+ If no connection_name is given, it will return an array of dictionaries,
+ one per replication channel.
+ None will be returned if no replication channels are found (or none are
found
+ with the given name).
+ """
+ if connection_name is None:
+ result = conn.execute("SHOW ALL SLAVES STATUS")
+ else:
+ result = conn.execute("SHOW SLAVE '{}' STATUS".format(connection_name))
+ if result["success"] and result["numrows"] > 0:
+ status = list()
+ for channel in result["rows"]:
+ status.append(dict(zip(result["fields"], channel)))
+ return status
+ else:
+ return None
+
+
+def get_heartbeat_status(conn, shard=None, primary_dc='eqiad', db='heartbeat',
table='heartbeat'):
+ if primary_dc not in ['eqiad', 'codfw']:
+ return None
+ if shard is None:
+ query = """
+ SELECT shard, min(greatest(0, TIMESTAMPDIFF(MICROSECOND, ts,
UTC_TIMESTAMP(6)) - 500000)) AS lag
+ FROM {}.{}
+ WHERE datacenter = '{}'
+ GROUP BY shard
+ """.format(db, table, primary_dc)
+ else:
+ query = """
+ SELECT shard, min(greatest(0, TIMESTAMPDIFF(MICROSECOND, ts,
UTC_TIMESTAMP(6)) - 500000)) AS lag
+ FROM {}.{}
+ WHERE datacenter = '{}'
+ AND shard = '{}'
+ """.format(db, table, primary_dc, shard)
+ result = conn.execute(query)
+ if result["success"] and result["numrows"] > 0:
+ status = dict()
+ for channel in result["rows"]:
+ if channel[1] is not None:
+ status[channel[0].decode('utf-8')] = int(channel[1])/1000000
+ if len(status) == 0:
+ return None
+ else:
+ return status
+ else:
+ return None
+
+
+def get_processes(process_name):
+ try:
+ return list(map(int, subprocess.check_output(['/bin/pidof',
process_name]).split()))
+ except subprocess.CalledProcessError:
+ return list()
+
+
+def get_status(options):
+ status = dict()
+
+ if options.process and options.host != 'localhost':
+ print("ERROR: Checking process is only allowed on localhost")
+ sys.exit(-1)
+ elif options.process:
+ mysqld_processes = get_processes('mysqld')
+ status['mysqld_processes'] = mysqld_processes
+
+ time_before_connect = time.time()
+ mysql = WMFMariaDB(host=options.host, port=options.port,
+ connect_timeout=options.connect_timeout,
+ debug=options.debug)
+ time_after_connect = time.time()
+
+ wait_timeout = math.ceil(options.query_timeout)
+ result = mysql.execute("SET SESSION innodb_lock_wait_timeout = {0},
SESSION lock_wait_timeout = {0}, SESSION wait_timeout =
{0}".format(wait_timeout))
+
+ if mysql.connection is None:
+ status['connection'] = None
+ else:
+ status['connection'] = 'ok'
+ version = get_var(mysql, 'version')
+ read_only = get_var(mysql, 'read_only')
+ uptime = get_var(mysql, 'Uptime', type='STATUS')
+ ssl = get_var(mysql, 'Ssl_cipher', type='STATUS')
+ ssl_expiration = get_var(mysql, 'Ssl_server_not_after', type='STATUS')
+ threads_connected = get_var(mysql, 'Threads\_connected', type='STATUS')
+ total_queries = get_var(mysql, 'Queries', type='STATUS')
+ now = time.time() # get the time here for more exact QPS calculations
+ if options.slave_status:
+ replication = get_replication_status(mysql)
+
+ time_before_heartbeat = time.time()
+ heartbeat = get_heartbeat_status(mysql,
+ primary_dc=options.primary_dc,
+ shard=options.shard)
+ time_after_heartbeat = time.time()
+ mysql.disconnect()
+
+ if version is not None:
+ status['version'] = version
+ if read_only is not None:
+ status['read_only'] = read_only == 'ON'
+
+ if uptime is not None:
+ status['uptime'] = int(uptime)
+
+ if ssl is None or ssl == '':
+ status['ssl'] = False
+ else:
+ status['ssl'] = True
+ if ssl_expiration is not None and ssl_expiration != '':
+ try:
+ # We assume we will be always using GMT
+ status['ssl_expiration'] =
time.mktime(datetime.strptime(ssl_expiration, '%b %d %H:%M:%S %Y
%Z').timetuple())
+ except ValueError:
+ status['ssl_expiration'] = None
+
+ if total_queries is not None:
+ status['total_queries'] = int(total_queries)
+
+ if threads_connected is not None:
+ status['datetime'] = now
+ status['threads_connected'] = int(threads_connected)
+
+ if heartbeat is not None and len(heartbeat) > 0:
+ status['heartbeat'] = heartbeat
+ status['query_latency'] = time_after_heartbeat -
time_before_heartbeat
+
+
+ if options.slave_status and replication is not None and
len(replication) > 0:
+ status['replication'] = dict()
+ for channel in replication:
+ replication_status = dict()
+ replication_status['Slave_IO_Running'] =
channel['Slave_IO_Running']
+ replication_status['Slave_SQL_Running'] =
channel['Slave_SQL_Running']
+ replication_status['Seconds_Behind_Master'] =
channel['Seconds_Behind_Master']
+ replication_status['Last_IO_Error'] = channel['Last_IO_Error']
if channel['Last_IO_Error'] != '' else None
+ #FIXME may contain private data, needs filtering:
+ replication_status['Last_SQL_Error'] =
channel['Last_SQL_Error'] if channel['Last_SQL_Error'] != '' else None
+ status['replication'][channel['Connection_name']] =
replication_status
+
+ status['connection_latency'] = time_after_connect - time_before_connect
+
+ return status
+
+
+def icinga_check(options):
+ OK = 0
+ WARNING = 1
+ CRITICAL = 2
+ UNKNOWN = 3
+
+ status = get_status(options)
+ if status['connection'] is None:
+ print("Could not connect to {}:{}".format(options.host, options.port))
+ sys.exit(CRITICAL)
+
+ time.sleep(1)
+ second_status = get_status(options)
+
+ msg = ''
+ unknown_msg = []
+ warn_msg = []
+ crit_msg = []
+ ok_msg = []
+
+ # Version and uptime for now cannot generate alerts
+ ok_msg.append('Version {}'.format(status['version']))
+ ok_msg.append('Uptime {}s'.format(status['uptime']))
+
+ # check processes
+ if options.num_processes is not None:
+ if 'mysqld_processes' not in status:
+ unknown_msg.append("Process monitoring was requested, but it
wasn't configured")
+ else:
+ num_processes = len(status['mysqld_processes'])
+ if num_processes != options.num_processes:
+ crit_msg.append('{} mysqld process(es) running, expected {}'.
format(num_processes, options.num_processes))
+ else:
+ ok_msg.append('{} mysqld
process(es)'.format(len(status['mysqld_processes'])))
+ elif 'mysqld_processes' in status:
+ ok_msg.append('{} mysqld
process(es)'.format(len(status['mysqld_processes'])))
+
+ # check read only is correct
+ if options.read_only is not None:
+ expected_read_only = options.read_only.lower() in ['true', '1', 'on',
'yes', 't', 'y']
+ if status['read_only'] != expected_read_only:
+ crit_msg.append('read_only: "{}", expected
"{}"'.format(status['read_only'], expected_read_only))
+ else:
+ ok_msg.append('read_only: {}'.format(status['read_only']))
+ else:
+ ok_msg.append('read_only: {}'.format(status['read_only']))
+
+ # check lag
+ if 'heartbeat' in status:
+ for connection_name, lag in status['heartbeat'].items():
+ if options.crit_lag and lag >= options.crit_lag:
+ crit_msg.append('{} lag is {:.2f}s'.format(connection_name,
lag))
+ elif options.crit_lag and lag >= options.warn_lag:
+ warn_msg.append('{} lag is {:.2f}s'.format(connection_name,
lag))
+ else:
+ ok_msg.append('{} lag: {:.2f}s'.format(connection_name, lag))
+
+ # check crit_connections
+ if options.crit_connections is not None and status['threads_connected'] >=
options.crit_connections:
+ crit_msg.append('{} client(s)'.format(status['threads_connected']))
+ elif options.warn_connections is not None and status['threads_connected']
>= options.warn_connections:
+ warn_msg.append('{} client(s)'.format(status['threads_connected']))
+ else:
+ ok_msg.append('{} client(s)'.format(status['threads_connected']))
+
+ # QPS and latencies (cannot yet generate alarms)
+ # Note the monitoring will create ~10 QPS more than if monitoring wasn't
active
+ qps = (second_status['total_queries'] - status['total_queries']) /
(second_status['datetime'] - status['datetime'])
+ ok_msg.append('{:.2f} QPS'.format(qps))
+
+ ok_msg.append('connection latency:
{:.6f}s'.format(status['connection_latency']))
+ ok_msg.append('query latency: {:.6f}s'.format(status['query_latency']))
+
+ exit_code = None
+ if len(crit_msg) > 0:
+ msg = msg + 'CRIT: ' + ', '.join(crit_msg) + '; '
+ if exit_code is None:
+ exit_code = CRITICAL
+
+ if len(warn_msg) > 0:
+ msg = msg + 'WARN: ' + ', '.join(warn_msg) + '; '
+ if exit_code is None:
+ exit_code = WARNING
+
+ if len(unknown_msg) > 0:
+ msg = msg + 'UNKNOWN: ' + ', '.join(unknown_msg) + '; '
+ if exit_code is None:
+ exit_code = UNKNOWN
+
+ if len(ok_msg) > 0:
+ if exit_code is None:
+ msg = msg + ', '.join(ok_msg)
+ exit_code = OK
+ else:
+ msg = msg + 'OK: ' + ', '.join(ok_msg)
+
+
+
+ print(msg)
+ sys.exit(exit_code)
+
+
+def main():
+ parser = parse_args()
+ options = parser.parse_args()
+
+ if options.icinga:
+ icinga_check(options)
+ else:
+ status = get_status(options)
+ print(json.dumps(status))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/modules/mariadb/manifests/config.pp
b/modules/mariadb/manifests/config.pp
index 9670967..249feec 100644
--- a/modules/mariadb/manifests/config.pp
+++ b/modules/mariadb/manifests/config.pp
@@ -134,6 +134,14 @@
source => 'puppet:///modules/icinga/check_mariadb.pl',
}
+ # new script to check the health of a server
+ file { '/usr/bin/check_mariadb.py':
+ owner => 'root',
+ group => 'root',
+ mode => '0755',
+ source => 'puppet:///modules/mariadb/check_mariadb.py',
+ }
+
if ($ssl == 'on' or $ssl == 'puppet-cert') {
# This creates also /etc/mysql/ssl
--
To view, visit https://gerrit.wikimedia.org/r/369397
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id79f39cb149e2e14de4a42774a4defe2de97d919
Gerrit-PatchSet: 1
Gerrit-Project: operations/puppet
Gerrit-Branch: production
Gerrit-Owner: Jcrespo <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits