Ottomata has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/392733 )

Change subject: Add refinery-drop-hive-partitions
......................................................................

Add refinery-drop-hive-partitions

This script works with multiple tables in a database, and
can infer all partitions, not just hourly date time bucketed partiions,
to drop.

This adds a refinery.util.HivePartiion class to make
working with Hive partitions a little easier.  More work
could be done to make HiveUtils better with this.

NOTE: This is pretty inefficient to run on lots of tables,
as each hive CLI command is run via the shell, which requires
starting up a JVM each time we run a hive command.

Bug: T181064

Change-Id: I935551c63f35f747397aa05783f767925ab923bc
---
A bin/refinery-drop-hive-partitions
M bin/refinery-drop-hourly-partitions
M bin/refinery-drop-mediawiki-snapshots
M bin/refinery-drop-webrequest-partitions
M bin/refinery-get-add-webrequest-partition-statements
M python/refinery/util.py
M python/tests/test_refinery/test_util.py
7 files changed, 378 insertions(+), 6 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery 
refs/changes/33/392733/1

diff --git a/bin/refinery-drop-hive-partitions 
b/bin/refinery-drop-hive-partitions
new file mode 100755
index 0000000..f599055
--- /dev/null
+++ b/bin/refinery-drop-hive-partitions
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Note: You should make sure to put refinery/python on your PYTHONPATH.
+#   export PYTHONPATH=$PYTHONPATH:/path/to/refinery/python
+
+"""
+Automatically drops old Hive partitions from Hive tables
+and deletes the hourly time bucketed directories from HDFS.
+
+NOTE: This script should replace refinery-drop-hourly-partitions once
+all uses are ported over to it.
+
+Usage: refinery-drop-hive-partitions [options]
+
+Options:
+    -h --help                           Show this help message and exit.
+    -d --older-than-days=<days>         Drop data older than this number of 
days.  [default: 60]
+    -D --database=<dbname>              Hive database name.  [default: default]
+    -t --tables=<tables>                Comma separated list of tables to 
check.  Defaults to
+                                        all tables in database.
+    -l --limit=<limit>                  Only drop from this many tables.  
Useful for testing.
+    -N --partition-depth=<n>            Number of directories to use when 
constructing partition file glob.
+                                        If not given, this will be infered 
from the first partition
+                                        found in the table.  If the table has 
no current partitions,
+                                        this will throw an exeption.
+    -o --hive-options=<options>         Any valid Hive CLI options you want to 
pass to Hive commands.
+                                        Example: '--auxpath 
/path/to/hive-serdes-1.0-SNAPSHOT.jar'
+    -v --verbose                        Turn on verbose debug logging.
+    -n --dry-run                        Don't actually drop any partitions, 
just output the Hive queries to drop partitions.
+"""
+__author__ = 'Andrew Otto <o...@wikimedia.org>'
+
+import datetime
+from   docopt   import docopt
+import logging
+import re
+import os
+import sys
+from refinery.util import HiveUtils, HivePartition, HdfsUtils
+
+
+if __name__ == '__main__':
+    # parse arguments
+    arguments = docopt(__doc__)
+
+    days            = int(arguments['--older-than-days'])
+    database        = arguments['--database']
+    tables          = arguments['--tables']
+    limit           = arguments['--limit']
+    partition_depth = arguments['--partition-depth']
+    hive_options    = arguments['--hive-options']
+    verbose         = arguments['--verbose']
+    dry_run         = arguments['--dry-run']
+
+
+    log_level = logging.INFO
+    if verbose:
+        log_level = logging.DEBUG
+
+    logging.basicConfig(level=log_level,
+                        format='%(asctime)s %(levelname)-6s %(message)s',
+                        datefmt='%Y-%m-%dT%H:%M:%S')
+
+    if partition_depth is not None:
+        partition_depth = int(partition_depth)
+
+    # Delete partitions older than this.
+    old_partition_datetime_threshold = datetime.datetime.now() - 
datetime.timedelta(days=days)
+
+    # Instantiate HiveUtils.
+    hive = HiveUtils(database, hive_options)
+
+    if tables is not None:
+        tables = tables.split(',')
+    else:
+        tables = hive.get_tables()
+
+    if limit is not None:
+        tables = tables[:int(limit)]
+
+    # Iterate through each table and find old Hive partitions and HDFS paths 
to drop.
+    for table in tables:
+        logging.info('Looking for partitions to drop for 
{}.{}...'.format(database, table))
+        # Attempt to infer table location from the table metadata.
+        table_location = hive.table_location(table)
+
+        partitions_to_drop        = []
+        partition_paths_to_delete = []
+
+        # Loop through all partitions for this table and drop anything that is 
too old.
+        hive_partitions = hive.partitions(table)
+        for partition in hive_partitions:
+            if partition.datetime() < old_partition_datetime_threshold:
+                partitions_to_drop.append(partition)
+
+
+        # Build a glob based on number of partition keys found in Hive 
partitions.
+        # This will be used to find possible directories that need to be 
removed
+        # from HDFS. TODO: This has a bug in that no data will be removed from 
HDFS
+        # If not partitions were in the Hive table.
+        if partition_depth is not None:
+            partition_glob = os.path.join(*([table_location] + ['*'] * 
partition_depth))
+        elif len(hive_partitions) > 0:
+            partition_glob = hive_partitions[0].glob(base_path=table_location)
+        else:
+            partition_glob = None
+            logging.warn(
+                'Could not search for HDFS paths to drop. '
+                'Could not construct partition glob from a partition depth.'
+            )
+
+        if partition_glob is not None:
+            # Loop through all the partition directory paths for this table
+            # and check if any of them are old enough for deletion.
+            for partition_path in HdfsUtils.ls(partition_glob, 
include_children=False):
+                try:
+                    partition = HivePartition(partition_path)
+                    if partition.datetime() < old_partition_datetime_threshold:
+                        partition_paths_to_delete.append(partition_path)
+                except Exception as e:
+                    logging.error(
+                        'Could not parse date from {}. Skipping. ({})'
+                        .format(partition_path, e)
+                    )
+                    continue
+
+
+        # Drop any old Hive partitions
+        if partitions_to_drop:
+            partition_specs_to_drop = [p.spec() for p in partitions_to_drop]
+            if dry_run:
+                print(hive.drop_partitions_ddl(table, partition_specs_to_drop))
+            else:
+                logging.info('Dropping {0} Hive partitions from table {1}.{2}'
+                    .format(len(partition_specs_to_drop), database, table)
+                )
+                hive.drop_partitions(table, partition_specs_to_drop)
+        else:
+            logging.info('No Hive partitions need dropped for table 
{0}.{1}'.format(database, table))
+
+        # Delete any old HDFS data
+        if partition_paths_to_delete:
+            if dry_run:
+                print('hdfs dfs -rm -R ' + ' '.join(partition_paths_to_delete))
+            else:
+                logging.info('Removing {0} partition directories for table 
{1}.{2} from {3}.'
+                    .format(len(partition_paths_to_delete), database, table, 
table_location)
+                )
+                HdfsUtils.rm(' '.join(partition_paths_to_delete))
+        else:
+            logging.info('No partition directories need removed for table 
{0}.{1}'.format(database, table))
diff --git a/bin/refinery-drop-hourly-partitions 
b/bin/refinery-drop-hourly-partitions
index 93380c1..906a35b 100755
--- a/bin/refinery-drop-hourly-partitions
+++ b/bin/refinery-drop-hourly-partitions
@@ -125,7 +125,7 @@
     partition_paths_to_delete = []
 
     # Loop through all partitions for this table and drop anything that is too 
old.
-    for partition_spec in hive.partitions(table):
+    for partition_spec in hive.partition_specs(table):
         partition_datetime = hive.partition_datetime_from_spec(
             partition_spec,
             partition_spec_regex
diff --git a/bin/refinery-drop-mediawiki-snapshots 
b/bin/refinery-drop-mediawiki-snapshots
index 1f19d7f..d18ce39 100755
--- a/bin/refinery-drop-mediawiki-snapshots
+++ b/bin/refinery-drop-mediawiki-snapshots
@@ -139,7 +139,7 @@
 # Returns the partitions to be dropped given a hive table
 def get_partitions_to_drop(hive, table, keep_snapshots):
     logger.debug('Getting partitions to drop...')
-    partitions = hive.partitions(table)
+    partitions = hive.partition_specs(table)
     spec_separator = HiveUtils.partition_spec_separator
 
     # For tables partitioned by dimensions other than snapshot
diff --git a/bin/refinery-drop-webrequest-partitions 
b/bin/refinery-drop-webrequest-partitions
index 265a2ca..f59acd0 100755
--- a/bin/refinery-drop-webrequest-partitions
+++ b/bin/refinery-drop-webrequest-partitions
@@ -127,7 +127,7 @@
     partition_paths_to_delete = []
 
     # Loop through all partitions for this table and drop anything that is too 
old.
-    for partition_spec in hive.partitions(table):
+    for partition_spec in hive.partition_specs(table):
         partition_datetime = hive.partition_datetime_from_spec(
             partition_spec,
             partition_spec_regex
diff --git a/bin/refinery-get-add-webrequest-partition-statements 
b/bin/refinery-get-add-webrequest-partition-statements
index b5271d7..9afb306 100755
--- a/bin/refinery-get-add-webrequest-partition-statements
+++ b/bin/refinery-get-add-webrequest-partition-statements
@@ -103,7 +103,7 @@
         sys.exit(1)
 
     statements = []
-    for spec in hive.partitions(table):
+    for spec in hive.partition_specs(table):
         if webrequest_type == 'raw':
             path = webrequest_raw_partition_path_from_spec(location, spec)
         else:
diff --git a/python/refinery/util.py b/python/refinery/util.py
index 32d8f3b..d180f28 100755
--- a/python/refinery/util.py
+++ b/python/refinery/util.py
@@ -17,6 +17,7 @@
 Wikimedia Anaytics Refinery python utilities.
 """
 
+from collections import OrderedDict
 import datetime
 import logging
 import os
@@ -154,6 +155,12 @@
         self.tables = {}
 
 
+    def get_tables(self):
+        """Returns the list of tables in the database"""
+        self._tables_init()
+        return self.tables.keys()
+
+
     def table_exists(self, table): # ,force=False
         """Returns true if the table exists in the current database."""
         self._tables_init()
@@ -206,7 +213,7 @@
         return table_location
 
 
-    def partitions(self, table):
+    def partition_specs(self, table):
         """
         Returns a list of partitions for the given Hive table in partition 
spec format.
 
@@ -226,6 +233,19 @@
             ]
 
         return self.tables[table]['partitions']
+
+
+    def partitions(self, table):
+        """
+        Returns a list of HivePartitions for the given Hive table.
+
+        Returns:
+            A list of HivePartition dicts
+        """
+
+        # Cache results for later.
+        # If we don't know the partitions yet, get them now.
+        return [HivePartition(p) for p in self.partition_specs(table)]
 
 
     def drop_partitions(self, table, partition_specs):
@@ -427,6 +447,116 @@
         return sh(cmd, check_return_code)
 
 
+class HivePartition(OrderedDict):
+    partition_regex          = re.compile(r'(\w+)=["\']?(\w+)["\']?')
+    camus_regex              = 
re.compile(r'.*/hourly/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/(?P<hour>\d+)')
+
+    desc_separator = '/'
+    spec_separator = ','
+
+    zfill_keys = {
+        'year': 4,
+        'month': 2,
+        'day': 2,
+        'hour': 2
+    }
+
+    def __init__(self, partition_string):
+
+        # If we see an '=', assume this is a Hive style partition desc or spec.
+        if '=' in partition_string:
+            partitions = 
HivePartition.partition_regex.findall(partition_string)
+        # Else assume this is a time bucketed camus imported path.
+        # This only works with hourly camus data.
+        else:
+            match = HivePartition.camus_regex.search(partition_string)
+            if match:
+                partitions = []
+                for key in ('year', 'month', 'day', 'hour'):
+                    partitions.append((key, str(int(match.group(key)))))
+            else:
+                raise Exception(
+                    'No path matching {0} was found in {1}.'.format(
+                        HivePartition.camus_regex.pattern, partition_string
+                    )
+                )
+
+        super(HivePartition, self).__init__(partitions)
+
+
+    def datetime(self):
+        """
+        Returns a datetime.datetime for this partition.
+        """
+        return datetime.datetime(
+            int(self.get('year')),
+            int(self.get('month', 1)),
+            int(self.get('day',   1)),
+            int(self.get('hour',  0))
+        )
+
+
+    def list(self, quote=False):
+        """
+        Returns a list of Hive partition key=value strings.
+        IF quote=True, string values will be quoted.
+        """
+        l = []
+        # Loop through each partition,
+        # adding quotes around strings if quote=True
+        for k, v in self.items():
+            if quote and not v.isdigit():
+                v = '\'{}\''.format(v)
+            l.append('{}={}'.format(k, v))
+        return l
+
+
+    def desc(self):
+        """
+        Returns a Hive desc string, e.g. 
datacenter=eqiad/year=2017/month=11/day=21/hour=0
+        """
+        return HivePartition.desc_separator.join(self.list())
+
+
+    def spec(self):
+        """
+        Returns a Hive spec string, e.g. 
datacenter='eqiad',year=2017,month=11,day=21,hour=0
+        """
+        return HivePartition.spec_separator.join(self.list(quote=True))
+
+
+    def path(self, base_path=None):
+        """
+        Returns a path to the partition.  If base_path is given, it will be 
prefixed.
+        """
+        dirs = self.list()
+        if base_path is not None:
+            dirs = [base_path] + dirs
+        return os.path.join(*dirs)
+
+
+    def camus_path(self, base_path=None):
+        """
+        Returns a path to a keyless camus partition, e.g. 2017/02/05/00.
+        If base_path is given, it will be prefixed.
+        """
+        dirs = [v.zfill(HivePartition.zfill_keys.get(k, 0)) for k, v in 
self.items()]
+        if base_path is not None:
+            dirs = [base_path] + dirs
+        return os.path.join(*dirs)
+
+    def glob(self, base_path=None):
+        """
+        Returns a file glob that would have matched this partition.
+        This is just a handy way to build a file glob to match other partitions
+        as deep as this one.  If base_path is given, it will be prefixed.
+        """
+        globs = ['*'] * len(self)
+        if base_path is not None:
+            globs = [base_path] + globs
+        return os.path.join(*globs)
+
+
 class HdfsUtils(object):
     # TODO:  Use snakebite instead of shelling out to 'hdfs dfs'.
 
diff --git a/python/tests/test_refinery/test_util.py 
b/python/tests/test_refinery/test_util.py
index b9e8e96..f242918 100644
--- a/python/tests/test_refinery/test_util.py
+++ b/python/tests/test_refinery/test_util.py
@@ -1,6 +1,6 @@
 from unittest import TestCase
 from datetime import datetime, timedelta
-from refinery.util import HiveUtils, HdfsUtils, sh
+from refinery.util import HiveUtils, HivePartition, HdfsUtils, sh
 import os
 
 
@@ -19,6 +19,84 @@
         output = sh(command)
         self.assertEqual(output, 'hi_you')
 
+class TestHivePartition(TestCase):
+    def setUp(self):
+        self.partition_desc = 
'datacenter=eqiad/year=2017/month=11/day=2/hour=16'
+        self.hive_partition = HivePartition(self.partition_desc)
+
+    def test_init_from_hive_desc(self):
+        should_be = [
+            ('datacenter', 'eqiad'),
+            ('year', '2017'),
+            ('month', '11'),
+            ('day', '2'),
+            ('hour', '16')
+        ]
+        self.assertEqual(self.hive_partition.items(), should_be)
+
+    def test_init_from_hive_spec(self):
+        partition_spec = 
HiveUtils.partition_spec_from_partition_desc(self.partition_desc)
+        hive_partition = HivePartition(partition_spec)
+        should_be = [
+            ('datacenter', 'eqiad'),
+            ('year', '2017'),
+            ('month', '11'),
+            ('day', '2'),
+            ('hour', '16')
+        ]
+        self.assertEqual(hive_partition.items(), should_be)
+
+    def test_init_from_hive_path(self):
+        hive_path = 
'/path/to/data/datacenter=eqiad/year=2017/month=11/day=2/hour=16'
+        hive_partition = HivePartition(hive_path)
+        should_be = [
+            ('datacenter', 'eqiad'),
+            ('year', '2017'),
+            ('month', '11'),
+            ('day', '2'),
+            ('hour', '16')
+        ]
+        self.assertEqual(hive_partition.items(), should_be)
+
+    def test_init_from_camus_path(self):
+        camus_path     = '/path/to/eqiad_data/hourly/2017/11/02/16'
+        hive_partition = HivePartition(camus_path)
+
+        should_be = [
+            ('year', '2017'),
+            ('month', '11'),
+            ('day', '2'),
+            ('hour', '16')
+        ]
+        self.assertEqual(hive_partition.items(), should_be)
+
+    def test_datetime(self):
+        d = self.hive_partition.datetime()
+        should_be = datetime(2017,11,2,16)
+        self.assertEqual(d, should_be)
+
+    def test_list(self):
+        self.assertEqual(self.partition_desc.split('/'), 
self.hive_partition.list())
+
+    def test_desc(self):
+        self.assertEqual(self.hive_partition.desc(), self.partition_desc)
+
+    def test_spec(self):
+        should_be = 'datacenter=\'eqiad\',year=2017,month=11,day=2,hour=16'
+        self.assertEqual(self.hive_partition.spec(), should_be)
+
+    def test_path(self):
+        self.assertEqual(self.hive_partition.path(), self.partition_desc)
+
+    def test_camus_path(self):
+        should_be = '/path/to/data/hourly/eqiad/2017/11/02/16'
+        
self.assertEqual(self.hive_partition.camus_path('/path/to/data/hourly'), 
should_be)
+
+    def test_glob(self):
+        should_be = '*/*/*/*/*'
+        self.assertEqual(self.hive_partition.glob(), should_be)
+
+
 class TestHiveUtil(TestCase):
     def setUp(self):
         self.hive = HiveUtils()

-- 
To view, visit https://gerrit.wikimedia.org/r/392733
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I935551c63f35f747397aa05783f767925ab923bc
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery
Gerrit-Branch: master
Gerrit-Owner: Ottomata <ao...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to