Ottomata has uploaded a new change for review. https://gerrit.wikimedia.org/r/89118
Change subject: Adding hive-partitioner. ...................................................................... Adding hive-partitioner. Needs review and refactoring and more docs. Change-Id: Ia812078906978c0fb44110e36034ad49ac9e3c29 --- A kraken-etl/hive-partitioner 1 file changed, 204 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/kraken refs/changes/18/89118/1 diff --git a/kraken-etl/hive-partitioner b/kraken-etl/hive-partitioner new file mode 100755 index 0000000..0e6a346 --- /dev/null +++ b/kraken-etl/hive-partitioner @@ -0,0 +1,204 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +""" Automatically adds Hive partitions based on HDFS directory hierarchy. + +Usage: hive-partitioner [options] <camus_destination_path> + +Options: + -h --help Show this help message and exit. + -D --database=<dbname> Hive database name. [default: default] + -o --hive-options=<options> Any valid Hive CLI options you want to pass to Hive commands. + Example: '--auxpath /path/to/hive-serdes-1.0-SNAPSHOT.jar' +""" +__author__ = 'Andrew Otto <[email protected]>' + +from datetime import datetime +from docopt import docopt +import logging +import os +import subprocess + +import pprint + +pp = pprint.pprint +logger = logging.getLogger('hive-partitioner') + +interval_hierarchies = { + 'hourly': { 'depth': 4, 'directory_format': '%Y/%m/%d/%H', 'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H' }, + 'daily': { 'depth': 3, 'directory_format': '%Y/%m/%d', 'hive_partition_format': 'year=%Y/month=%m/day=%d' }, + 'monthly': { 'depth': 2, 'directory_format': '%Y/%m', 'hive_partition_format': 'year=%Y/month=%m' }, + 'yearly': { 'depth': 1, 'directory_format': '%Y', 'hive_partition_format': 'year=%Y' }, +} + +def topics(path): + """Reads topic names out of camus_destination_path""" + return sh('hadoop fs -ls %s/ | grep -v "Found .* items" | awk -F "/" \'{print $NF}\'' % (path)).split('\n') + +# TODO configure interval partition paths automatically instead of hardcoding +def camus_partitions(topic, camus_destination_path, interval='hourly'): + """ + Returns a list of time bucketd 'partitions' created by Camus import. + These are inferred from directories in hdfs. + """ + + basedir = camus_destination_path + '/' + topic + '/' + interval + + + depth = interval_hierarchies[interval]['depth'] + # number of wildcard time bucket directories, e.g. depth_stars == 4 -> '/*/*/*/*' + depth_stars = '/*' * depth + directories = sh('hadoop fs -ls -d %s%s | grep -v \'Found .* items\' | awk \'{print $NF}\'' % (basedir, depth_stars)).split('\n') + # return list of time bucket directories, e.g. ['2013/10/09/15', '2013/10/09/16', ... ] + return [directory.replace(basedir + '/', '') for directory in directories] + + # return sh('hadoop fs -ls -R {0}/{1}/{2} | sed "s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1, month=\2, day=\3, hour=\4@" | grep "year.*" | sort | uniq'.format(camus_destination_path, topic, interval)).split() + # return a list of lists of time buckets, e.g. [['2013']] + # return [directory.split('/')[-depth:] for directory in directories] + # return [(int(d) for d in directory.split('/')[-depth:]) for directory in directories] + +def call(command, check_return_code=True): + logger.debug('Running: {0}'.format(' '.join(command))) + p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + stdout, stderr = p.communicate() + if check_return_code and p.returncode != 0: + raise RuntimeError("Command: {0} failed with error code: {1}".format(" ".join(command), p.returncode), + stdout, stderr) + return stdout.strip() + +def sh(command): + logger.debug('Running: %s' % (command)) + return os.popen(command).read().strip() + + +def diff_datewise(left, right, left_format=None, right_format=None): + """ + Parameters + left : a list of datetime strings or objects + right : a list of datetime strings or objects + left_format : None if left contains datetimes, or strptime format + right_format : None if right contains datetimes, or strptime format + + Returns + A tuple of two sets: + [0] : the datetime objects in left but not right + [1] : the datetime objects in right but not left + """ + + if left_format: + left_set = set([datetime.strptime(l, left_format) for l in left]) + else: + left_set = set(left) + + if right_format: + right_set = set([datetime.strptime(r, right_format) for r in right]) + else: + right_set = set(right) + + return (left_set - right_set, right_set - left_set) + + +class HiveUtils(object): + def __init__(self, database='default', options=''): + self.database = database + self.options = options.split() + self.hivecmd = None + # list of partitions for table, keyed by table + # self.table_partitions = {} + self.hivecmd = ['hive'] + self.options + ['cli', '--database', self.database] + # initialize tables dict with table names + self.tables = {} + # cache results for later + for t in self.tables_get(): + self.tables[t] = {} + + def partitions(self, table): + """Returns a list of partitions for the given Hive table.""" + + # cache results for later + # if we don't know the partitions yet, get them now + if not 'partitions' in self.tables[table].keys(): + stdout = self.query("SHOW PARTITIONS %s;" % table) + partitions = stdout.split('\n') + partitions.remove('partition') + self.tables[table]['partitions'] = partitions + + return self.tables[table]['partitions'] + + def table_exists(self, table): # ,force=False + return table in self.tables.keys() + + def tables_get(self): + t = self.query('SHOW TABLES').split('\n') + t.remove('tab_name') + return t + + def partition_interval(self, table): + intervals = { + 4: 'hourly', + 3: 'daily', + 2: 'monhtly', + 1: 'yearly', + } + + # cache results for later + if not 'interval' in self.tables[table].keys(): + # counts the number of partition keys this table has + # and returns a string time interval based in this number + cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table + ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc -l' + logger.debug('Running: %s' % cmd) + # using check_output directly here so that we can pass shell=True and use pipes. + partition_depth = int(subprocess.check_output(cmd, stderr=subprocess.PIPE, shell=True).strip()) + self.tables[table]['depth'] = partition_depth + self.tables[table]['interval'] = intervals[partition_depth] + + return self.tables[table]['interval'] + + def query(self, query, check_return_code=True): + """Runs the given hive query and returns stdout""" + return self.command(['-e', query], check_return_code) + + def script(self, script, check_return_code=True): + """Runs the contents of the given script in hive and returns stdout""" + if not os.path.isfile(script): + raise RuntimeError("Hive script: {0} does not exist.".format(script)) + return self.command( ['-f', script], check_return_code) + + def command(self, args, check_return_code=True): + """Runs the `hive` from the command line, passing in the given args, and + returning stdout. + """ + cmd = self.hivecmd + args + return call(cmd, check_return_code) + + +if __name__ == '__main__': + # parse arguments + arguments = docopt(__doc__) + print(arguments) + logging.basicConfig(level=logging.DEBUG) + + camus_destination_path = arguments['<camus_destination_path>'] + database = arguments['--database'] + hive_options = arguments['--hive-options'] + + hive = HiveUtils(database, hive_options) + + topics = topics(camus_destination_path) + print("Topics: " + ','.join(topics)) + for topic in topics: + if hive.table_exists(topic): + interval = hive.partition_interval(topic) + hive_partitions = hive.partitions(topic) + camus_partitions = camus_partitions(topic, camus_destination_path) + + logger.debug(("Hive Partitions for %s:\n" % topic) + '\n.join(hive_partitions)) + logger.debug(("Camus Partitions for %s:\n " % topic) + '\n'.join(camus_partitions)) + logger.debug("%s import interval is %s" % (topic, interval)) + + # diff the camus and hive partitions + missing_hive, missing_camus = diff_datewise(camus_partitions, hive_partitions, + interval_hierarchies[interval]['directory_format'], + interval_hierarchies[interval]['hive_partition_format']) + print("Need to create partition for:") + pp(missing_hive) + -- To view, visit https://gerrit.wikimedia.org/r/89118 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia812078906978c0fb44110e36034ad49ac9e3c29 Gerrit-PatchSet: 1 Gerrit-Project: analytics/kraken Gerrit-Branch: master Gerrit-Owner: Ottomata <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
