Nuria has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/355601 )

Change subject: Add script to purge old mediawiki data snapshots
......................................................................


Add script to purge old mediawiki data snapshots

Bug: T162034
Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369
---
A bin/refinery-drop-mediawiki-snapshots
1 file changed, 289 insertions(+), 0 deletions(-)

Approvals:
  Elukey: Looks good to me, but someone else must approve
  Nuria: Verified; Looks good to me, approved



diff --git a/bin/refinery-drop-mediawiki-snapshots 
b/bin/refinery-drop-mediawiki-snapshots
new file mode 100755
index 0000000..1f19d7f
--- /dev/null
+++ b/bin/refinery-drop-mediawiki-snapshots
@@ -0,0 +1,289 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Note: You should make sure to put refinery/python on your PYTHONPATH.
+#   export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python
+
+"""
+Automatically drops old partitions from the mediawiki raw and historical
+tables. See AFFECTED_TABLES dict for a comprehensive list.
+
+As this data sets are historical (they span from the beginning of time
+to latest import), the dimension used to determine which partitions need
+to be removed is not time, it's "snapshot". By default the last 6
+snapshots will be kept: the current snapshot plus the previous 5.
+
+Note: Ad-hoc snapshots not following the default naming convention
+snapshot=YYYY-MM, like private snapshots, are not considered neither
+affected by this script.
+
+Usage: refinery-drop-mediawiki-snapshots [options]
+
+Options:
+    -h --help                       Show this help message and exit.
+    -s --keep-snapshots=<n>         Keep the <n> most recent snapshots. 
[default: 6]
+    -v --verbose                    Turn on verbose debug logging.
+    -n --dry-run                    Don't actually drop any partitions, just 
output Hive queries.
+"""
+
+
+from docopt import docopt
+from refinery.util import HiveUtils, HdfsUtils
+import datetime
+import logging
+import os
+import re
+import sys
+
+
+# Set up logging to be split:
+#   INFO+DEBUG+WARNING -> stdout
+#   ERROR              -> stderr
+# Thanks to https://stackoverflow.com/users/5124424/zoey-greer
+class LessThanFilter(logging.Filter):
+    def __init__(self, exclusive_maximum, name=""):
+        super(LessThanFilter, self).__init__(name)
+        self.max_level = exclusive_maximum
+
+    def filter(self, record):
+        #non-zero return means we log this message
+        return 1 if record.levelno < self.max_level else 0
+
+logger = logging.getLogger()
+logger.setLevel(logging.NOTSET)
+
+formatter = logging.Formatter(
+    fmt='%(asctime)s %(levelname)-6s %(message)s',
+    datefmt='%Y-%m-%dT%H:%M:%S',
+)
+
+handler_out = logging.StreamHandler(sys.stdout)
+handler_out.setLevel(logging.DEBUG)
+handler_out.addFilter(LessThanFilter(logging.ERROR))
+handler_out.setFormatter(formatter)
+logger.addHandler(handler_out)
+
+handler_err = logging.StreamHandler(sys.stderr)
+handler_err.setLevel(logging.ERROR)
+handler_err.setFormatter(formatter)
+logger.addHandler(handler_err)
+
+
+# Tables that have mediawiki snapshots to be managed
+# key: database, value: table
+AFFECTED_TABLES = {
+    'wmf_raw': [
+        'mediawiki_archive',
+        'mediawiki_ipblocks',
+        'mediawiki_logging',
+        'mediawiki_page',
+        'mediawiki_pagelinks',
+        'mediawiki_project_namespace_map',
+        'mediawiki_redirect',
+        'mediawiki_revision',
+        'mediawiki_user',
+        'mediawiki_user_groups'
+    ],
+    'wmf': [
+        'mediawiki_history',
+        'mediawiki_metrics',
+        'mediawiki_page_history',
+        'mediawiki_user_history'
+    ]
+}
+
+# Tables partitioned by wiki_db in addition to by snapshot
+WIKI_DB_TABLES = [
+    'mediawiki_archive',
+    'mediawiki_ipblocks',
+    'mediawiki_logging',
+    'mediawiki_page',
+    'mediawiki_pagelinks',
+    'mediawiki_redirect',
+    'mediawiki_revision',
+    'mediawiki_user',
+    'mediawiki_user_groups',
+    'mediawiki_metrics'
+]
+
+# Tables partitioned by metric in addition to by snapshot
+METRIC_TABLES = [
+    'mediawiki_metrics'
+]
+
+
+# Returns the age in days of a given partition spec
+TODAYS_ORDINAL = datetime.datetime.now().toordinal()
+SPEC_DATE_REGEX = 
re.compile(r"snapshot='(?P<year>[0-9]{4})-(?P<month>[0-9]{2})'")
+def get_partition_age(hive):
+    return (lambda partition:
+        TODAYS_ORDINAL -
+        hive.partition_datetime_from_spec(
+            partition,
+            SPEC_DATE_REGEX
+        ).toordinal()
+    )
+
+# Returns the partitions to be dropped given a hive table
+def get_partitions_to_drop(hive, table, keep_snapshots):
+    logger.debug('Getting partitions to drop...')
+    partitions = hive.partitions(table)
+    spec_separator = HiveUtils.partition_spec_separator
+
+    # For tables partitioned by dimensions other than snapshot
+    # extract just the snapshot spec:
+    # snapshot=2017-01,wiki_db=enwiki => snapshot=2017-01
+    if table in WIKI_DB_TABLES + METRIC_TABLES:
+        snapshots = set([])
+        for partition in partitions:
+            snapshot = partition.split(spec_separator)[0]
+            snapshots.add(snapshot)
+        partitions = list(snapshots)
+
+    # Filter out ad-hoc or private snapshots
+    partitions = [
+        p for p in partitions
+        if re.match("^snapshot='[0-9]{4}-[0-9]{2}'$", p)
+    ]
+
+    # Select partitions to drop (keep the most recent <keep_snapshots> ones)
+    partitions.sort(key=get_partition_age(hive))
+    partitions_to_drop = partitions[keep_snapshots:]
+
+    # HACK: For tables partitioned by dimensions other than snapshot
+    # add <dimension>!='' to snapshot spec, so that HiveUtils deletes
+    # the whole snapshot partition with all sub-partitions in it.
+    if table in METRIC_TABLES:
+        partitions_to_drop = [
+            spec_separator.join([p, "metric!=''"])
+            for p in partitions_to_drop
+        ]
+    if table in WIKI_DB_TABLES:
+        partitions_to_drop = [
+            spec_separator.join([p, "wiki_db!=''"])
+            for p in partitions_to_drop
+        ]
+    return partitions_to_drop
+
+# Returns the age in days of a given partition directory
+PATH_DATE_REGEX = re.compile(r'snapshot=([0-9]{4}-[0-9]{2})')
+PATH_DATE_FORMAT = '%Y-%m'
+def get_directory_age(hive):
+    return (lambda path:
+        TODAYS_ORDINAL -
+        hive.partition_datetime_from_path(
+            path,
+            PATH_DATE_REGEX,
+            PATH_DATE_FORMAT
+        ).toordinal()
+    )
+
+# Returns the directories to be removed given a hive table
+def get_directories_to_remove(hive, table, keep_snapshots):
+    logger.debug('Getting directories to remove...')
+    table_location = hive.table_location(table)
+
+    # Get partition directories
+    glob = os.path.join(table_location, '*')
+    directories = HdfsUtils.ls(glob, include_children=False)
+
+    # Filter out private snapshots
+    directories = [
+        d for d in directories
+        if re.match('^.*/snapshot=[0-9]{4}-[0-9]{2}$', d)
+    ]
+
+    # Select directories to drop (keep the most recent <keep_snapshots> ones)
+    directories.sort(key=get_directory_age(hive))
+    return directories[keep_snapshots:]
+
+# Raises an error if partitions and directories do not match
+def check_partitions_vs_directories(partitions, directories):
+    spec_separator = HiveUtils.partition_spec_separator
+    partition_snapshots = set([p.split(spec_separator)[0].replace("'", '') for 
p in partitions])
+    directory_snapshots = set([os.path.basename(d) for d in directories])
+    if partition_snapshots != directory_snapshots:
+        logger.error(
+            'Selected partitions extracted from table specs ({0}) '
+            'does not match selected partitions extracted from data paths 
({1}).'
+            .format(partition_snapshots, directory_snapshots)
+        )
+        sys.exit(1)
+
+# Drop given hive table partitions (if dry_run, just print)
+def drop_partitions(hive, table, partitions, dry_run):
+    if partitions:
+        if dry_run:
+            print(hive.drop_partitions_ddl(table, partitions))
+        else:
+            logger.info(
+                'Dropping {0} partitions from {1}.{2}'
+                .format(len(partitions), hive.database, table)
+            )
+            for partition in partitions:
+                logger.debug(partition)
+            hive.drop_partitions(table, partitions)
+    else:
+        logger.info(
+            'No partitions need to be dropped from {0}.{1}'
+            .format(hive.database, table)
+        )
+
+# Remove given data directories (if dry_run, just print)
+def remove_directories(hive, table, directories, dry_run):
+    table_location = hive.table_location(table)
+    if directories:
+        if dry_run:
+            print('hdfs dfs -rm -R ' + ' '.join(directories))
+        else:
+            logger.info('Removing {0} directories from {1}'
+                .format(len(directories), table_location)
+            )
+            for directory in directories:
+                logger.debug(directory)
+            HdfsUtils.rm(' '.join(directories))
+    else:
+        logger.info('No directories need to be removed for 
{0}'.format(table_location))
+
+
+if __name__ == '__main__':
+    # Parse arguments
+    arguments = docopt(__doc__)
+    keep_snapshots  = int(arguments['--keep-snapshots'])
+    verbose         = arguments['--verbose']
+    dry_run         = arguments['--dry-run']
+
+    # Setup logging level
+    logger.setLevel(logging.INFO)
+    if verbose:
+        logger.setLevel(logging.DEBUG)
+
+    # Check arguments
+    if keep_snapshots < 6:
+        logger.error('Option \'--keep-snapshots\' must be greater or equal 
than 6.')
+        sys.exit(1)
+
+    for database, tables in AFFECTED_TABLES.items():
+        # Instantiate HiveUtils
+        hive = HiveUtils(database)
+
+        # Apply the cleaning to each table
+        for table in tables:
+            logger.debug('Processing table {0}'.format(table))
+            partitions = get_partitions_to_drop(hive, table, keep_snapshots)
+            directories = get_directories_to_remove(hive, table, 
keep_snapshots)
+            check_partitions_vs_directories(partitions, directories)
+            drop_partitions(hive, table, partitions, dry_run)
+            remove_directories(hive, table, directories, dry_run)

-- 
To view, visit https://gerrit.wikimedia.org/r/355601
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I7a6ce082dd539c9bd02433cb7db5368ec2c07369
Gerrit-PatchSet: 9
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Elukey <ltosc...@wikimedia.org>
Gerrit-Reviewer: Joal <j...@wikimedia.org>
Gerrit-Reviewer: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: Ottomata <ao...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to