Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/89118


Change subject: Adding hive-partitioner.
......................................................................

Adding hive-partitioner.

Needs review and refactoring and more docs.

Change-Id: Ia812078906978c0fb44110e36034ad49ac9e3c29
---
A kraken-etl/hive-partitioner
1 file changed, 204 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/kraken 
refs/changes/18/89118/1

diff --git a/kraken-etl/hive-partitioner b/kraken-etl/hive-partitioner
new file mode 100755
index 0000000..0e6a346
--- /dev/null
+++ b/kraken-etl/hive-partitioner
@@ -0,0 +1,204 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+""" Automatically adds Hive partitions based on HDFS directory hierarchy.
+
+Usage: hive-partitioner [options] <camus_destination_path>
+    
+Options:
+    -h --help                       Show this help message and exit.
+    -D --database=<dbname>          Hive database name.  [default: default]
+    -o --hive-options=<options>     Any valid Hive CLI options you want to 
pass to Hive commands.
+                                    Example: '--auxpath 
/path/to/hive-serdes-1.0-SNAPSHOT.jar'
+"""
+__author__ = 'Andrew Otto <[email protected]>'
+
+from   datetime import datetime
+from   docopt import docopt
+import logging
+import os
+import subprocess
+
+import pprint
+
+pp     = pprint.pprint
+logger = logging.getLogger('hive-partitioner')
+
+interval_hierarchies = {
+    'hourly':  { 'depth': 4, 'directory_format': '%Y/%m/%d/%H', 
'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H' },
+    'daily':   { 'depth': 3, 'directory_format': '%Y/%m/%d',    
'hive_partition_format': 'year=%Y/month=%m/day=%d' },
+    'monthly': { 'depth': 2, 'directory_format': '%Y/%m',       
'hive_partition_format': 'year=%Y/month=%m' },
+    'yearly':  { 'depth': 1, 'directory_format': '%Y',          
'hive_partition_format': 'year=%Y' },
+}
+
+def topics(path):
+    """Reads topic names out of camus_destination_path"""
+    return sh('hadoop fs -ls %s/ | grep -v "Found .* items" | awk -F "/" 
\'{print $NF}\'' % (path)).split('\n')
+
+# TODO configure interval partition paths automatically instead of hardcoding
+def camus_partitions(topic, camus_destination_path, interval='hourly'):
+    """
+    Returns a list of time bucketd 'partitions' created by Camus import.
+    These are inferred from directories in hdfs.
+    """
+    
+    basedir = camus_destination_path + '/' + topic + '/' + interval
+
+
+    depth = interval_hierarchies[interval]['depth']
+    # number of  wildcard time bucket directories, e.g. depth_stars == 4 -> 
'/*/*/*/*'
+    depth_stars = '/*' * depth
+    directories = sh('hadoop fs -ls -d %s%s | grep -v \'Found .* items\' | awk 
\'{print $NF}\'' % (basedir, depth_stars)).split('\n')
+    # return list of time bucket directories, e.g. ['2013/10/09/15', 
'2013/10/09/16', ... ]
+    return [directory.replace(basedir + '/', '') for directory in directories]
+
+    # return sh('hadoop fs -ls -R {0}/{1}/{2} | sed 
"s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1, 
month=\2, day=\3, hour=\4@" | grep "year.*" | sort | 
uniq'.format(camus_destination_path, topic, interval)).split()
+    # return a list of lists of time buckets, e.g. [['2013']]
+    # return [directory.split('/')[-depth:] for directory in directories]    
+    # return [(int(d) for d in directory.split('/')[-depth:]) for directory in 
directories]
+
+def call(command, check_return_code=True):
+    logger.debug('Running: {0}'.format(' '.join(command)))
+    p = subprocess.Popen(command, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE)
+    stdout, stderr = p.communicate()
+    if check_return_code and p.returncode != 0:
+        raise RuntimeError("Command: {0} failed with error code: {1}".format(" 
".join(command), p.returncode),
+                               stdout, stderr)
+    return stdout.strip()
+
+def sh(command):
+    logger.debug('Running: %s' % (command))
+    return os.popen(command).read().strip()
+
+
+def diff_datewise(left, right, left_format=None, right_format=None):
+    """
+    Parameters
+        left            : a list of datetime strings or objects
+        right           : a list of datetime strings or objects
+        left_format     : None if left contains datetimes, or strptime format
+        right_format    : None if right contains datetimes, or strptime format
+ 
+    Returns
+        A tuple of two sets:
+        [0] : the datetime objects in left but not right
+        [1] : the datetime objects in right but not left
+    """
+    
+    if left_format:
+        left_set = set([datetime.strptime(l, left_format) for l in left])
+    else:
+        left_set = set(left)
+    
+    if right_format:
+        right_set = set([datetime.strptime(r, right_format) for r in right])
+    else:
+        right_set = set(right)
+    
+    return (left_set - right_set, right_set - left_set)
+
+
+class HiveUtils(object):
+    def __init__(self, database='default', options=''):
+        self.database   = database
+        self.options    = options.split()
+        self.hivecmd    = None
+        # list of partitions for table, keyed by table
+        # self.table_partitions = {}
+        self.hivecmd = ['hive'] + self.options + ['cli', '--database', 
self.database]
+        # initialize tables dict with table names
+        self.tables  = {}
+        # cache results for later
+        for t in self.tables_get():
+            self.tables[t] = {}
+
+    def partitions(self, table):
+        """Returns a list of partitions for the given Hive table."""
+
+        # cache results for later
+        # if we don't know the partitions yet, get them now
+        if not 'partitions' in self.tables[table].keys():
+            stdout     = self.query("SHOW PARTITIONS %s;" % table)
+            partitions = stdout.split('\n')
+            partitions.remove('partition')
+            self.tables[table]['partitions'] = partitions
+
+        return self.tables[table]['partitions']
+
+    def table_exists(self, table): # ,force=False
+        return table in self.tables.keys()
+
+    def tables_get(self):
+        t = self.query('SHOW TABLES').split('\n')
+        t.remove('tab_name')
+        return t
+
+    def partition_interval(self, table):
+        intervals = {
+            4: 'hourly',
+            3: 'daily',
+            2: 'monhtly',
+            1: 'yearly',
+        }
+        
+        # cache results for later
+        if not 'interval' in self.tables[table].keys():
+            # counts the number of partition keys this table has
+            # and returns a string time interval based in this number        
+            cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table 
+ ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc 
-l'
+            logger.debug('Running: %s' % cmd)
+            # using check_output directly here so that we can pass shell=True 
and use pipes.
+            partition_depth = int(subprocess.check_output(cmd, 
stderr=subprocess.PIPE, shell=True).strip())
+            self.tables[table]['depth']    = partition_depth
+            self.tables[table]['interval'] = intervals[partition_depth]
+
+        return self.tables[table]['interval']
+
+    def query(self, query, check_return_code=True):
+        """Runs the given hive query and returns stdout"""
+        return self.command(['-e', query], check_return_code)
+
+    def script(self, script, check_return_code=True):
+        """Runs the contents of the given script in hive and returns stdout"""
+        if not os.path.isfile(script):
+            raise RuntimeError("Hive script: {0} does not 
exist.".format(script))
+        return self.command( ['-f', script], check_return_code)
+
+    def command(self, args, check_return_code=True):
+        """Runs the `hive` from the command line, passing in the given args, 
and
+           returning stdout.
+        """
+        cmd = self.hivecmd + args
+        return call(cmd, check_return_code)
+
+
+if __name__ == '__main__':
+    # parse arguments
+    arguments = docopt(__doc__)
+    print(arguments)
+    logging.basicConfig(level=logging.DEBUG)
+
+    camus_destination_path = arguments['<camus_destination_path>']
+    database               = arguments['--database']
+    hive_options           = arguments['--hive-options']
+
+    hive = HiveUtils(database, hive_options)
+
+    topics = topics(camus_destination_path)
+    print("Topics: " + ','.join(topics))
+    for topic in topics:
+        if hive.table_exists(topic):
+            interval         = hive.partition_interval(topic)
+            hive_partitions  = hive.partitions(topic)
+            camus_partitions = camus_partitions(topic, camus_destination_path)
+
+            logger.debug(("Hive Partitions for %s:\n" % topic) + 
'\n.join(hive_partitions))
+            logger.debug(("Camus Partitions for %s:\n " % topic) + 
'\n'.join(camus_partitions))
+            logger.debug("%s import interval is %s" % (topic, interval))
+            
+            # diff the camus and hive partitions
+            missing_hive, missing_camus = diff_datewise(camus_partitions, 
hive_partitions,
+                interval_hierarchies[interval]['directory_format'],
+                interval_hierarchies[interval]['hive_partition_format'])
+            print("Need to create partition for:")
+            pp(missing_hive)
+

-- 
To view, visit https://gerrit.wikimedia.org/r/89118
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia812078906978c0fb44110e36034ad49ac9e3c29
Gerrit-PatchSet: 1
Gerrit-Project: analytics/kraken
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to