Nuria has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/355601 )
Change subject: Add script to purge old mediawiki data snapshots ...................................................................... Add script to purge old mediawiki data snapshots Bug: T162034 Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369 --- A bin/refinery-drop-mediawiki-snapshots 1 file changed, 289 insertions(+), 0 deletions(-) Approvals: Elukey: Looks good to me, but someone else must approve Nuria: Verified; Looks good to me, approved diff --git a/bin/refinery-drop-mediawiki-snapshots b/bin/refinery-drop-mediawiki-snapshots new file mode 100755 index 0000000..1f19d7f --- /dev/null +++ b/bin/refinery-drop-mediawiki-snapshots @@ -0,0 +1,289 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Note: You should make sure to put refinery/python on your PYTHONPATH. +# export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python + +""" +Automatically drops old partitions from the mediawiki raw and historical +tables. See AFFECTED_TABLES dict for a comprehensive list. + +As this data sets are historical (they span from the beginning of time +to latest import), the dimension used to determine which partitions need +to be removed is not time, it's "snapshot". By default the last 6 +snapshots will be kept: the current snapshot plus the previous 5. + +Note: Ad-hoc snapshots not following the default naming convention +snapshot=YYYY-MM, like private snapshots, are not considered neither +affected by this script. + +Usage: refinery-drop-mediawiki-snapshots [options] + +Options: + -h --help Show this help message and exit. + -s --keep-snapshots=<n> Keep the <n> most recent snapshots. [default: 6] + -v --verbose Turn on verbose debug logging. + -n --dry-run Don't actually drop any partitions, just output Hive queries. +""" + + +from docopt import docopt +from refinery.util import HiveUtils, HdfsUtils +import datetime +import logging +import os +import re +import sys + + +# Set up logging to be split: +# INFO+DEBUG+WARNING -> stdout +# ERROR -> stderr +# Thanks to https://stackoverflow.com/users/5124424/zoey-greer +class LessThanFilter(logging.Filter): + def __init__(self, exclusive_maximum, name=""): + super(LessThanFilter, self).__init__(name) + self.max_level = exclusive_maximum + + def filter(self, record): + #non-zero return means we log this message + return 1 if record.levelno < self.max_level else 0 + +logger = logging.getLogger() +logger.setLevel(logging.NOTSET) + +formatter = logging.Formatter( + fmt='%(asctime)s %(levelname)-6s %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S', +) + +handler_out = logging.StreamHandler(sys.stdout) +handler_out.setLevel(logging.DEBUG) +handler_out.addFilter(LessThanFilter(logging.ERROR)) +handler_out.setFormatter(formatter) +logger.addHandler(handler_out) + +handler_err = logging.StreamHandler(sys.stderr) +handler_err.setLevel(logging.ERROR) +handler_err.setFormatter(formatter) +logger.addHandler(handler_err) + + +# Tables that have mediawiki snapshots to be managed +# key: database, value: table +AFFECTED_TABLES = { + 'wmf_raw': [ + 'mediawiki_archive', + 'mediawiki_ipblocks', + 'mediawiki_logging', + 'mediawiki_page', + 'mediawiki_pagelinks', + 'mediawiki_project_namespace_map', + 'mediawiki_redirect', + 'mediawiki_revision', + 'mediawiki_user', + 'mediawiki_user_groups' + ], + 'wmf': [ + 'mediawiki_history', + 'mediawiki_metrics', + 'mediawiki_page_history', + 'mediawiki_user_history' + ] +} + +# Tables partitioned by wiki_db in addition to by snapshot +WIKI_DB_TABLES = [ + 'mediawiki_archive', + 'mediawiki_ipblocks', + 'mediawiki_logging', + 'mediawiki_page', + 'mediawiki_pagelinks', + 'mediawiki_redirect', + 'mediawiki_revision', + 'mediawiki_user', + 'mediawiki_user_groups', + 'mediawiki_metrics' +] + +# Tables partitioned by metric in addition to by snapshot +METRIC_TABLES = [ + 'mediawiki_metrics' +] + + +# Returns the age in days of a given partition spec +TODAYS_ORDINAL = datetime.datetime.now().toordinal() +SPEC_DATE_REGEX = re.compile(r"snapshot='(?P<year>[0-9]{4})-(?P<month>[0-9]{2})'") +def get_partition_age(hive): + return (lambda partition: + TODAYS_ORDINAL - + hive.partition_datetime_from_spec( + partition, + SPEC_DATE_REGEX + ).toordinal() + ) + +# Returns the partitions to be dropped given a hive table +def get_partitions_to_drop(hive, table, keep_snapshots): + logger.debug('Getting partitions to drop...') + partitions = hive.partitions(table) + spec_separator = HiveUtils.partition_spec_separator + + # For tables partitioned by dimensions other than snapshot + # extract just the snapshot spec: + # snapshot=2017-01,wiki_db=enwiki => snapshot=2017-01 + if table in WIKI_DB_TABLES + METRIC_TABLES: + snapshots = set([]) + for partition in partitions: + snapshot = partition.split(spec_separator)[0] + snapshots.add(snapshot) + partitions = list(snapshots) + + # Filter out ad-hoc or private snapshots + partitions = [ + p for p in partitions + if re.match("^snapshot='[0-9]{4}-[0-9]{2}'$", p) + ] + + # Select partitions to drop (keep the most recent <keep_snapshots> ones) + partitions.sort(key=get_partition_age(hive)) + partitions_to_drop = partitions[keep_snapshots:] + + # HACK: For tables partitioned by dimensions other than snapshot + # add <dimension>!='' to snapshot spec, so that HiveUtils deletes + # the whole snapshot partition with all sub-partitions in it. + if table in METRIC_TABLES: + partitions_to_drop = [ + spec_separator.join([p, "metric!=''"]) + for p in partitions_to_drop + ] + if table in WIKI_DB_TABLES: + partitions_to_drop = [ + spec_separator.join([p, "wiki_db!=''"]) + for p in partitions_to_drop + ] + return partitions_to_drop + +# Returns the age in days of a given partition directory +PATH_DATE_REGEX = re.compile(r'snapshot=([0-9]{4}-[0-9]{2})') +PATH_DATE_FORMAT = '%Y-%m' +def get_directory_age(hive): + return (lambda path: + TODAYS_ORDINAL - + hive.partition_datetime_from_path( + path, + PATH_DATE_REGEX, + PATH_DATE_FORMAT + ).toordinal() + ) + +# Returns the directories to be removed given a hive table +def get_directories_to_remove(hive, table, keep_snapshots): + logger.debug('Getting directories to remove...') + table_location = hive.table_location(table) + + # Get partition directories + glob = os.path.join(table_location, '*') + directories = HdfsUtils.ls(glob, include_children=False) + + # Filter out private snapshots + directories = [ + d for d in directories + if re.match('^.*/snapshot=[0-9]{4}-[0-9]{2}$', d) + ] + + # Select directories to drop (keep the most recent <keep_snapshots> ones) + directories.sort(key=get_directory_age(hive)) + return directories[keep_snapshots:] + +# Raises an error if partitions and directories do not match +def check_partitions_vs_directories(partitions, directories): + spec_separator = HiveUtils.partition_spec_separator + partition_snapshots = set([p.split(spec_separator)[0].replace("'", '') for p in partitions]) + directory_snapshots = set([os.path.basename(d) for d in directories]) + if partition_snapshots != directory_snapshots: + logger.error( + 'Selected partitions extracted from table specs ({0}) ' + 'does not match selected partitions extracted from data paths ({1}).' + .format(partition_snapshots, directory_snapshots) + ) + sys.exit(1) + +# Drop given hive table partitions (if dry_run, just print) +def drop_partitions(hive, table, partitions, dry_run): + if partitions: + if dry_run: + print(hive.drop_partitions_ddl(table, partitions)) + else: + logger.info( + 'Dropping {0} partitions from {1}.{2}' + .format(len(partitions), hive.database, table) + ) + for partition in partitions: + logger.debug(partition) + hive.drop_partitions(table, partitions) + else: + logger.info( + 'No partitions need to be dropped from {0}.{1}' + .format(hive.database, table) + ) + +# Remove given data directories (if dry_run, just print) +def remove_directories(hive, table, directories, dry_run): + table_location = hive.table_location(table) + if directories: + if dry_run: + print('hdfs dfs -rm -R ' + ' '.join(directories)) + else: + logger.info('Removing {0} directories from {1}' + .format(len(directories), table_location) + ) + for directory in directories: + logger.debug(directory) + HdfsUtils.rm(' '.join(directories)) + else: + logger.info('No directories need to be removed for {0}'.format(table_location)) + + +if __name__ == '__main__': + # Parse arguments + arguments = docopt(__doc__) + keep_snapshots = int(arguments['--keep-snapshots']) + verbose = arguments['--verbose'] + dry_run = arguments['--dry-run'] + + # Setup logging level + logger.setLevel(logging.INFO) + if verbose: + logger.setLevel(logging.DEBUG) + + # Check arguments + if keep_snapshots < 6: + logger.error('Option \'--keep-snapshots\' must be greater or equal than 6.') + sys.exit(1) + + for database, tables in AFFECTED_TABLES.items(): + # Instantiate HiveUtils + hive = HiveUtils(database) + + # Apply the cleaning to each table + for table in tables: + logger.debug('Processing table {0}'.format(table)) + partitions = get_partitions_to_drop(hive, table, keep_snapshots) + directories = get_directories_to_remove(hive, table, keep_snapshots) + check_partitions_vs_directories(partitions, directories) + drop_partitions(hive, table, partitions, dry_run) + remove_directories(hive, table, directories, dry_run) -- To view, visit https://gerrit.wikimedia.org/r/355601 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369 Gerrit-PatchSet: 9 Gerrit-Project: analytics/refinery Gerrit-Branch: master Gerrit-Owner: Mforns <mfo...@wikimedia.org> Gerrit-Reviewer: Elukey <ltosc...@wikimedia.org> Gerrit-Reviewer: Joal <j...@wikimedia.org> Gerrit-Reviewer: Mforns <mfo...@wikimedia.org> Gerrit-Reviewer: Nuria <nu...@wikimedia.org> Gerrit-Reviewer: Ottomata <ao...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits