Ottomata has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/392733 )
Change subject: Add refinery-drop-hive-partitions ...................................................................... Add refinery-drop-hive-partitions This script works with multiple tables in a database, and can infer all partitions, not just hourly date time bucketed partiions, to drop. This adds a refinery.util.HivePartiion class to make working with Hive partitions a little easier. More work could be done to make HiveUtils better with this. NOTE: This is pretty inefficient to run on lots of tables, as each hive CLI command is run via the shell, which requires starting up a JVM each time we run a hive command. Bug: T181064 Change-Id: I935551c63f35f747397aa05783f767925ab923bc --- A bin/refinery-drop-hive-partitions M bin/refinery-drop-hourly-partitions M bin/refinery-drop-mediawiki-snapshots M bin/refinery-drop-webrequest-partitions M bin/refinery-get-add-webrequest-partition-statements M python/refinery/util.py M python/tests/test_refinery/test_util.py 7 files changed, 378 insertions(+), 6 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery refs/changes/33/392733/1 diff --git a/bin/refinery-drop-hive-partitions b/bin/refinery-drop-hive-partitions new file mode 100755 index 0000000..f599055 --- /dev/null +++ b/bin/refinery-drop-hive-partitions @@ -0,0 +1,164 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Note: You should make sure to put refinery/python on your PYTHONPATH. +# export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python + +""" +Automatically drops old Hive partitions from Hive tables +and deletes the hourly time bucketed directories from HDFS. + +NOTE: This script should replace refinery-drop-hourly-partitions once +all uses are ported over to it. + +Usage: refinery-drop-hive-partitions [options] + +Options: + -h --help Show this help message and exit. + -d --older-than-days=<days> Drop data older than this number of days. [default: 60] + -D --database=<dbname> Hive database name. [default: default] + -t --tables=<tables> Comma separated list of tables to check. Defaults to + all tables in database. + -l --limit=<limit> Only drop from this many tables. Useful for testing. + -N --partition-depth=<n> Number of directories to use when constructing partition file glob. + If not given, this will be infered from the first partition + found in the table. If the table has no current partitions, + this will throw an exeption. + -o --hive-options=<options> Any valid Hive CLI options you want to pass to Hive commands. + Example: '--auxpath /path/to/hive-serdes-1.0-SNAPSHOT.jar' + -v --verbose Turn on verbose debug logging. + -n --dry-run Don't actually drop any partitions, just output the Hive queries to drop partitions. +""" +__author__ = 'Andrew Otto <o...@wikimedia.org>' + +import datetime +from docopt import docopt +import logging +import re +import os +import sys +from refinery.util import HiveUtils, HivePartition, HdfsUtils + + +if __name__ == '__main__': + # parse arguments + arguments = docopt(__doc__) + + days = int(arguments['--older-than-days']) + database = arguments['--database'] + tables = arguments['--tables'] + limit = arguments['--limit'] + partition_depth = arguments['--partition-depth'] + hive_options = arguments['--hive-options'] + verbose = arguments['--verbose'] + dry_run = arguments['--dry-run'] + + + log_level = logging.INFO + if verbose: + log_level = logging.DEBUG + + logging.basicConfig(level=log_level, + format='%(asctime)s %(levelname)-6s %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S') + + if partition_depth is not None: + partition_depth = int(partition_depth) + + # Delete partitions older than this. + old_partition_datetime_threshold = datetime.datetime.now() - datetime.timedelta(days=days) + + # Instantiate HiveUtils. + hive = HiveUtils(database, hive_options) + + if tables is not None: + tables = tables.split(',') + else: + tables = hive.get_tables() + + if limit is not None: + tables = tables[:int(limit)] + + # Iterate through each table and find old Hive partitions and HDFS paths to drop. + for table in tables: + logging.info('Looking for partitions to drop for {}.{}...'.format(database, table)) + # Attempt to infer table location from the table metadata. + table_location = hive.table_location(table) + + partitions_to_drop = [] + partition_paths_to_delete = [] + + # Loop through all partitions for this table and drop anything that is too old. + hive_partitions = hive.partitions(table) + for partition in hive_partitions: + if partition.datetime() < old_partition_datetime_threshold: + partitions_to_drop.append(partition) + + + # Build a glob based on number of partition keys found in Hive partitions. + # This will be used to find possible directories that need to be removed + # from HDFS. TODO: This has a bug in that no data will be removed from HDFS + # If not partitions were in the Hive table. + if partition_depth is not None: + partition_glob = os.path.join(*([table_location] + ['*'] * partition_depth)) + elif len(hive_partitions) > 0: + partition_glob = hive_partitions[0].glob(base_path=table_location) + else: + partition_glob = None + logging.warn( + 'Could not search for HDFS paths to drop. ' + 'Could not construct partition glob from a partition depth.' + ) + + if partition_glob is not None: + # Loop through all the partition directory paths for this table + # and check if any of them are old enough for deletion. + for partition_path in HdfsUtils.ls(partition_glob, include_children=False): + try: + partition = HivePartition(partition_path) + if partition.datetime() < old_partition_datetime_threshold: + partition_paths_to_delete.append(partition_path) + except Exception as e: + logging.error( + 'Could not parse date from {}. Skipping. ({})' + .format(partition_path, e) + ) + continue + + + # Drop any old Hive partitions + if partitions_to_drop: + partition_specs_to_drop = [p.spec() for p in partitions_to_drop] + if dry_run: + print(hive.drop_partitions_ddl(table, partition_specs_to_drop)) + else: + logging.info('Dropping {0} Hive partitions from table {1}.{2}' + .format(len(partition_specs_to_drop), database, table) + ) + hive.drop_partitions(table, partition_specs_to_drop) + else: + logging.info('No Hive partitions need dropped for table {0}.{1}'.format(database, table)) + + # Delete any old HDFS data + if partition_paths_to_delete: + if dry_run: + print('hdfs dfs -rm -R ' + ' '.join(partition_paths_to_delete)) + else: + logging.info('Removing {0} partition directories for table {1}.{2} from {3}.' + .format(len(partition_paths_to_delete), database, table, table_location) + ) + HdfsUtils.rm(' '.join(partition_paths_to_delete)) + else: + logging.info('No partition directories need removed for table {0}.{1}'.format(database, table)) diff --git a/bin/refinery-drop-hourly-partitions b/bin/refinery-drop-hourly-partitions index 93380c1..906a35b 100755 --- a/bin/refinery-drop-hourly-partitions +++ b/bin/refinery-drop-hourly-partitions @@ -125,7 +125,7 @@ partition_paths_to_delete = [] # Loop through all partitions for this table and drop anything that is too old. - for partition_spec in hive.partitions(table): + for partition_spec in hive.partition_specs(table): partition_datetime = hive.partition_datetime_from_spec( partition_spec, partition_spec_regex diff --git a/bin/refinery-drop-mediawiki-snapshots b/bin/refinery-drop-mediawiki-snapshots index 1f19d7f..d18ce39 100755 --- a/bin/refinery-drop-mediawiki-snapshots +++ b/bin/refinery-drop-mediawiki-snapshots @@ -139,7 +139,7 @@ # Returns the partitions to be dropped given a hive table def get_partitions_to_drop(hive, table, keep_snapshots): logger.debug('Getting partitions to drop...') - partitions = hive.partitions(table) + partitions = hive.partition_specs(table) spec_separator = HiveUtils.partition_spec_separator # For tables partitioned by dimensions other than snapshot diff --git a/bin/refinery-drop-webrequest-partitions b/bin/refinery-drop-webrequest-partitions index 265a2ca..f59acd0 100755 --- a/bin/refinery-drop-webrequest-partitions +++ b/bin/refinery-drop-webrequest-partitions @@ -127,7 +127,7 @@ partition_paths_to_delete = [] # Loop through all partitions for this table and drop anything that is too old. - for partition_spec in hive.partitions(table): + for partition_spec in hive.partition_specs(table): partition_datetime = hive.partition_datetime_from_spec( partition_spec, partition_spec_regex diff --git a/bin/refinery-get-add-webrequest-partition-statements b/bin/refinery-get-add-webrequest-partition-statements index b5271d7..9afb306 100755 --- a/bin/refinery-get-add-webrequest-partition-statements +++ b/bin/refinery-get-add-webrequest-partition-statements @@ -103,7 +103,7 @@ sys.exit(1) statements = [] - for spec in hive.partitions(table): + for spec in hive.partition_specs(table): if webrequest_type == 'raw': path = webrequest_raw_partition_path_from_spec(location, spec) else: diff --git a/python/refinery/util.py b/python/refinery/util.py index 32d8f3b..d180f28 100755 --- a/python/refinery/util.py +++ b/python/refinery/util.py @@ -17,6 +17,7 @@ Wikimedia Anaytics Refinery python utilities. """ +from collections import OrderedDict import datetime import logging import os @@ -154,6 +155,12 @@ self.tables = {} + def get_tables(self): + """Returns the list of tables in the database""" + self._tables_init() + return self.tables.keys() + + def table_exists(self, table): # ,force=False """Returns true if the table exists in the current database.""" self._tables_init() @@ -206,7 +213,7 @@ return table_location - def partitions(self, table): + def partition_specs(self, table): """ Returns a list of partitions for the given Hive table in partition spec format. @@ -226,6 +233,19 @@ ] return self.tables[table]['partitions'] + + + def partitions(self, table): + """ + Returns a list of HivePartitions for the given Hive table. + + Returns: + A list of HivePartition dicts + """ + + # Cache results for later. + # If we don't know the partitions yet, get them now. + return [HivePartition(p) for p in self.partition_specs(table)] def drop_partitions(self, table, partition_specs): @@ -427,6 +447,116 @@ return sh(cmd, check_return_code) +class HivePartition(OrderedDict): + partition_regex = re.compile(r'(\w+)=["\']?(\w+)["\']?') + camus_regex = re.compile(r'.*/hourly/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/(?P<hour>\d+)') + + desc_separator = '/' + spec_separator = ',' + + zfill_keys = { + 'year': 4, + 'month': 2, + 'day': 2, + 'hour': 2 + } + + def __init__(self, partition_string): + + # If we see an '=', assume this is a Hive style partition desc or spec. + if '=' in partition_string: + partitions = HivePartition.partition_regex.findall(partition_string) + # Else assume this is a time bucketed camus imported path. + # This only works with hourly camus data. + else: + match = HivePartition.camus_regex.search(partition_string) + if match: + partitions = [] + for key in ('year', 'month', 'day', 'hour'): + partitions.append((key, str(int(match.group(key))))) + else: + raise Exception( + 'No path matching {0} was found in {1}.'.format( + HivePartition.camus_regex.pattern, partition_string + ) + ) + + super(HivePartition, self).__init__(partitions) + + + def datetime(self): + """ + Returns a datetime.datetime for this partition. + """ + return datetime.datetime( + int(self.get('year')), + int(self.get('month', 1)), + int(self.get('day', 1)), + int(self.get('hour', 0)) + ) + + + def list(self, quote=False): + """ + Returns a list of Hive partition key=value strings. + IF quote=True, string values will be quoted. + """ + l = [] + # Loop through each partition, + # adding quotes around strings if quote=True + for k, v in self.items(): + if quote and not v.isdigit(): + v = '\'{}\''.format(v) + l.append('{}={}'.format(k, v)) + return l + + + def desc(self): + """ + Returns a Hive desc string, e.g. datacenter=eqiad/year=2017/month=11/day=21/hour=0 + """ + return HivePartition.desc_separator.join(self.list()) + + + def spec(self): + """ + Returns a Hive spec string, e.g. datacenter='eqiad',year=2017,month=11,day=21,hour=0 + """ + return HivePartition.spec_separator.join(self.list(quote=True)) + + + def path(self, base_path=None): + """ + Returns a path to the partition. If base_path is given, it will be prefixed. + """ + dirs = self.list() + if base_path is not None: + dirs = [base_path] + dirs + return os.path.join(*dirs) + + + def camus_path(self, base_path=None): + """ + Returns a path to a keyless camus partition, e.g. 2017/02/05/00. + If base_path is given, it will be prefixed. + """ + dirs = [v.zfill(HivePartition.zfill_keys.get(k, 0)) for k, v in self.items()] + if base_path is not None: + dirs = [base_path] + dirs + return os.path.join(*dirs) + + def glob(self, base_path=None): + """ + Returns a file glob that would have matched this partition. + This is just a handy way to build a file glob to match other partitions + as deep as this one. If base_path is given, it will be prefixed. + """ + globs = ['*'] * len(self) + if base_path is not None: + globs = [base_path] + globs + return os.path.join(*globs) + + class HdfsUtils(object): # TODO: Use snakebite instead of shelling out to 'hdfs dfs'. diff --git a/python/tests/test_refinery/test_util.py b/python/tests/test_refinery/test_util.py index b9e8e96..f242918 100644 --- a/python/tests/test_refinery/test_util.py +++ b/python/tests/test_refinery/test_util.py @@ -1,6 +1,6 @@ from unittest import TestCase from datetime import datetime, timedelta -from refinery.util import HiveUtils, HdfsUtils, sh +from refinery.util import HiveUtils, HivePartition, HdfsUtils, sh import os @@ -19,6 +19,84 @@ output = sh(command) self.assertEqual(output, 'hi_you') +class TestHivePartition(TestCase): + def setUp(self): + self.partition_desc = 'datacenter=eqiad/year=2017/month=11/day=2/hour=16' + self.hive_partition = HivePartition(self.partition_desc) + + def test_init_from_hive_desc(self): + should_be = [ + ('datacenter', 'eqiad'), + ('year', '2017'), + ('month', '11'), + ('day', '2'), + ('hour', '16') + ] + self.assertEqual(self.hive_partition.items(), should_be) + + def test_init_from_hive_spec(self): + partition_spec = HiveUtils.partition_spec_from_partition_desc(self.partition_desc) + hive_partition = HivePartition(partition_spec) + should_be = [ + ('datacenter', 'eqiad'), + ('year', '2017'), + ('month', '11'), + ('day', '2'), + ('hour', '16') + ] + self.assertEqual(hive_partition.items(), should_be) + + def test_init_from_hive_path(self): + hive_path = '/path/to/data/datacenter=eqiad/year=2017/month=11/day=2/hour=16' + hive_partition = HivePartition(hive_path) + should_be = [ + ('datacenter', 'eqiad'), + ('year', '2017'), + ('month', '11'), + ('day', '2'), + ('hour', '16') + ] + self.assertEqual(hive_partition.items(), should_be) + + def test_init_from_camus_path(self): + camus_path = '/path/to/eqiad_data/hourly/2017/11/02/16' + hive_partition = HivePartition(camus_path) + + should_be = [ + ('year', '2017'), + ('month', '11'), + ('day', '2'), + ('hour', '16') + ] + self.assertEqual(hive_partition.items(), should_be) + + def test_datetime(self): + d = self.hive_partition.datetime() + should_be = datetime(2017,11,2,16) + self.assertEqual(d, should_be) + + def test_list(self): + self.assertEqual(self.partition_desc.split('/'), self.hive_partition.list()) + + def test_desc(self): + self.assertEqual(self.hive_partition.desc(), self.partition_desc) + + def test_spec(self): + should_be = 'datacenter=\'eqiad\',year=2017,month=11,day=2,hour=16' + self.assertEqual(self.hive_partition.spec(), should_be) + + def test_path(self): + self.assertEqual(self.hive_partition.path(), self.partition_desc) + + def test_camus_path(self): + should_be = '/path/to/data/hourly/eqiad/2017/11/02/16' + self.assertEqual(self.hive_partition.camus_path('/path/to/data/hourly'), should_be) + + def test_glob(self): + should_be = '*/*/*/*/*' + self.assertEqual(self.hive_partition.glob(), should_be) + + class TestHiveUtil(TestCase): def setUp(self): self.hive = HiveUtils() -- To view, visit https://gerrit.wikimedia.org/r/392733 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I935551c63f35f747397aa05783f767925ab923bc Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery Gerrit-Branch: master Gerrit-Owner: Ottomata <ao...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits