Milimetric has submitted this change and it was merged.
Change subject: Adding import logic for dumps pagecounts
......................................................................
Adding import logic for dumps pagecounts
Factoring out useful parts of hive-partitioner
Change-Id: I608d9cd78355c9b2845ed379b1a66975a538dc06
Card: analytics 1195.1
---
M .gitignore
M kraken-etl/hive-partitioner
A kraken-etl/pagecounts/import-one-hour.sh
A kraken-etl/pagecounts/import.py
A kraken-etl/test.py
A kraken-etl/util.py
6 files changed, 356 insertions(+), 154 deletions(-)
Approvals:
Milimetric: Verified; Looks good to me, approved
diff --git a/.gitignore b/.gitignore
index 4404079..fe03682 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,5 @@
tmp/
*.swp
.deploy
+
+*.pyc
diff --git a/kraken-etl/hive-partitioner b/kraken-etl/hive-partitioner
index 0e6a346..3581fc7 100755
--- a/kraken-etl/hive-partitioner
+++ b/kraken-etl/hive-partitioner
@@ -17,158 +17,12 @@
import logging
import os
import subprocess
-
import pprint
+
+from util import HiveUtils, CamusUtils, sh, diffDatewise, interval_hierarchies
pp = pprint.pprint
logger = logging.getLogger('hive-partitioner')
-
-interval_hierarchies = {
- 'hourly': { 'depth': 4, 'directory_format': '%Y/%m/%d/%H',
'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H' },
- 'daily': { 'depth': 3, 'directory_format': '%Y/%m/%d',
'hive_partition_format': 'year=%Y/month=%m/day=%d' },
- 'monthly': { 'depth': 2, 'directory_format': '%Y/%m',
'hive_partition_format': 'year=%Y/month=%m' },
- 'yearly': { 'depth': 1, 'directory_format': '%Y',
'hive_partition_format': 'year=%Y' },
-}
-
-def topics(path):
- """Reads topic names out of camus_destination_path"""
- return sh('hadoop fs -ls %s/ | grep -v "Found .* items" | awk -F "/"
\'{print $NF}\'' % (path)).split('\n')
-
-# TODO configure interval partition paths automatically instead of hardcoding
-def camus_partitions(topic, camus_destination_path, interval='hourly'):
- """
- Returns a list of time bucketd 'partitions' created by Camus import.
- These are inferred from directories in hdfs.
- """
-
- basedir = camus_destination_path + '/' + topic + '/' + interval
-
-
- depth = interval_hierarchies[interval]['depth']
- # number of wildcard time bucket directories, e.g. depth_stars == 4 ->
'/*/*/*/*'
- depth_stars = '/*' * depth
- directories = sh('hadoop fs -ls -d %s%s | grep -v \'Found .* items\' | awk
\'{print $NF}\'' % (basedir, depth_stars)).split('\n')
- # return list of time bucket directories, e.g. ['2013/10/09/15',
'2013/10/09/16', ... ]
- return [directory.replace(basedir + '/', '') for directory in directories]
-
- # return sh('hadoop fs -ls -R {0}/{1}/{2} | sed
"s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1,
month=\2, day=\3, hour=\4@" | grep "year.*" | sort |
uniq'.format(camus_destination_path, topic, interval)).split()
- # return a list of lists of time buckets, e.g. [['2013']]
- # return [directory.split('/')[-depth:] for directory in directories]
- # return [(int(d) for d in directory.split('/')[-depth:]) for directory in
directories]
-
-def call(command, check_return_code=True):
- logger.debug('Running: {0}'.format(' '.join(command)))
- p = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- stdout, stderr = p.communicate()
- if check_return_code and p.returncode != 0:
- raise RuntimeError("Command: {0} failed with error code: {1}".format("
".join(command), p.returncode),
- stdout, stderr)
- return stdout.strip()
-
-def sh(command):
- logger.debug('Running: %s' % (command))
- return os.popen(command).read().strip()
-
-
-def diff_datewise(left, right, left_format=None, right_format=None):
- """
- Parameters
- left : a list of datetime strings or objects
- right : a list of datetime strings or objects
- left_format : None if left contains datetimes, or strptime format
- right_format : None if right contains datetimes, or strptime format
-
- Returns
- A tuple of two sets:
- [0] : the datetime objects in left but not right
- [1] : the datetime objects in right but not left
- """
-
- if left_format:
- left_set = set([datetime.strptime(l, left_format) for l in left])
- else:
- left_set = set(left)
-
- if right_format:
- right_set = set([datetime.strptime(r, right_format) for r in right])
- else:
- right_set = set(right)
-
- return (left_set - right_set, right_set - left_set)
-
-
-class HiveUtils(object):
- def __init__(self, database='default', options=''):
- self.database = database
- self.options = options.split()
- self.hivecmd = None
- # list of partitions for table, keyed by table
- # self.table_partitions = {}
- self.hivecmd = ['hive'] + self.options + ['cli', '--database',
self.database]
- # initialize tables dict with table names
- self.tables = {}
- # cache results for later
- for t in self.tables_get():
- self.tables[t] = {}
-
- def partitions(self, table):
- """Returns a list of partitions for the given Hive table."""
-
- # cache results for later
- # if we don't know the partitions yet, get them now
- if not 'partitions' in self.tables[table].keys():
- stdout = self.query("SHOW PARTITIONS %s;" % table)
- partitions = stdout.split('\n')
- partitions.remove('partition')
- self.tables[table]['partitions'] = partitions
-
- return self.tables[table]['partitions']
-
- def table_exists(self, table): # ,force=False
- return table in self.tables.keys()
-
- def tables_get(self):
- t = self.query('SHOW TABLES').split('\n')
- t.remove('tab_name')
- return t
-
- def partition_interval(self, table):
- intervals = {
- 4: 'hourly',
- 3: 'daily',
- 2: 'monhtly',
- 1: 'yearly',
- }
-
- # cache results for later
- if not 'interval' in self.tables[table].keys():
- # counts the number of partition keys this table has
- # and returns a string time interval based in this number
- cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table
+ ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc
-l'
- logger.debug('Running: %s' % cmd)
- # using check_output directly here so that we can pass shell=True
and use pipes.
- partition_depth = int(subprocess.check_output(cmd,
stderr=subprocess.PIPE, shell=True).strip())
- self.tables[table]['depth'] = partition_depth
- self.tables[table]['interval'] = intervals[partition_depth]
-
- return self.tables[table]['interval']
-
- def query(self, query, check_return_code=True):
- """Runs the given hive query and returns stdout"""
- return self.command(['-e', query], check_return_code)
-
- def script(self, script, check_return_code=True):
- """Runs the contents of the given script in hive and returns stdout"""
- if not os.path.isfile(script):
- raise RuntimeError("Hive script: {0} does not
exist.".format(script))
- return self.command( ['-f', script], check_return_code)
-
- def command(self, args, check_return_code=True):
- """Runs the `hive` from the command line, passing in the given args,
and
- returning stdout.
- """
- cmd = self.hivecmd + args
- return call(cmd, check_return_code)
if __name__ == '__main__':
@@ -182,23 +36,23 @@
hive_options = arguments['--hive-options']
hive = HiveUtils(database, hive_options)
+ camus = CamusUtils(camus_destination_path)
- topics = topics(camus_destination_path)
+ topics = camus.topics()
print("Topics: " + ','.join(topics))
for topic in topics:
if hive.table_exists(topic):
interval = hive.partition_interval(topic)
hive_partitions = hive.partitions(topic)
- camus_partitions = camus_partitions(topic, camus_destination_path)
+ camus_partitions = camus.partitions(topic, camus_destination_path)
- logger.debug(("Hive Partitions for %s:\n" % topic) +
'\n.join(hive_partitions))
+ logger.debug(("Hive Partitions for %s:\n" % topic) +
'\n'.join(hive_partitions))
logger.debug(("Camus Partitions for %s:\n " % topic) +
'\n'.join(camus_partitions))
logger.debug("%s import interval is %s" % (topic, interval))
-
+
# diff the camus and hive partitions
- missing_hive, missing_camus = diff_datewise(camus_partitions,
hive_partitions,
+ missing_hive, missing_camus = diffDatewise(camus_partitions,
hive_partitions,
interval_hierarchies[interval]['directory_format'],
interval_hierarchies[interval]['hive_partition_format'])
print("Need to create partition for:")
pp(missing_hive)
-
diff --git a/kraken-etl/pagecounts/import-one-hour.sh
b/kraken-etl/pagecounts/import-one-hour.sh
new file mode 100644
index 0000000..ff8fd1d
--- /dev/null
+++ b/kraken-etl/pagecounts/import-one-hour.sh
@@ -0,0 +1,43 @@
+#!/bin/bash
+#
+# This script does the following:
+# 0. reads four arguments from the CLI, in order, as YEAR, MONTH, DAY,
HOUR
+# 1. downloads the specified hour worth of data from
http://dumps.wikimedia.org/other/pagecounts-raw/
+# 2. extracts the data into hdfs
+# 3. creates a partition on a hive table pointing to this data
+#
+
+print_help() {
+ cat <<EOF
+
+USAGE: import_one_hour.sh YEAR MONTH DAY HOUR
+
+ Imports one hour worth of data from dumps.wikimedia.org/other/pagecounts-raw
into a Hive table
+
+EOF
+}
+
+if [ $# -ne 4 ]
+then
+ print_help
+ exit
+fi
+
+YEAR=$1
+MONTH=$2
+DAY=$3
+HOUR=$4
+
+# Someone intelligent should set these
+LOCAL_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz
+HDFS_DIR=/user/milimetric/pagecounts/$YEAR.$MONTH.${DAY}_${HOUR}.00.00
+HDFS_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000
+TABLE=milimetric_pagecounts
+
+rm $LOCAL_FILE
+wget
http://dumps.wikimedia.org/other/pagecounts-raw/$YEAR/$YEAR-$MONTH/pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz
+hdfs dfs -rm -r $HDFS_DIR
+gunzip -c $LOCAL_FILE | hdfs dfs -put - $HDFS_DIR/$HDFS_FILE
+rm $LOCAL_FILE
+hive -e "ALTER TABLE ${TABLE} DROP PARTITION
(year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR');"
+hive -e "ALTER TABLE ${TABLE} ADD PARTITION
(year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR') location '$HDFS_DIR';"
diff --git a/kraken-etl/pagecounts/import.py b/kraken-etl/pagecounts/import.py
new file mode 100644
index 0000000..6c249a9
--- /dev/null
+++ b/kraken-etl/pagecounts/import.py
@@ -0,0 +1,38 @@
+from datetime import datetime, timedelta
+from util import (
+ HdfsFileCollection, diffDatewise, timestampsToNow, shell
+)
+
+target = CamusUtils('hdfs://wmf/')
+topic = 'pagecounts'
+hdfsFormat = target.basedir(topic) + '%Y/%m/%d/%H'
+
+imported = target.partitions(topic) or [datetime.today().strftime(hdfsFormat)]
+firstHour = datetime.strptime(min(imported), hdfsFormat)
+available = timestampsToNow(firstHour, timedelta(hours=1))
+
+hoursMissing = diffDatewise(
+ available,
+ imported,
+ rightParse=hdfsFormat,
+)
+
+for missing in hoursMissing:
+ print('*************************')
+ print('Importing {0} '.format(missing))
+
+ try:
+ result, err = shell([
+ 'import-one-hour',
+ missing.year,
+ missing.month,
+ missing.day,
+ missing.hour,
+ ])
+ if err:
+ print(err)
+ except Exception as e:
+ print(e)
+
+ print('Done Importing {0} '.format(missing))
+ print('*************************')
diff --git a/kraken-etl/test.py b/kraken-etl/test.py
new file mode 100644
index 0000000..69a7bf1
--- /dev/null
+++ b/kraken-etl/test.py
@@ -0,0 +1,46 @@
+from datetime import datetime, timedelta
+from util import diffDatewise, timestampsToNow
+from unittest import TestCase
+
+
+class TestUtil(TestCase):
+ def testDiffDatewise(self):
+ l = []
+ lJustDates = []
+ r = []
+ lp = 'blah%Y...%m...%d...%Hblahblah'
+ rp = 'neenee%Y%m%d%Hneenee'
+
+ expect0 = set([datetime(2012, 6, 14, 13), datetime(2012, 11, 9, 3)])
+ expect1 = set([datetime(2012, 6, 14, 14), datetime(2013, 11, 10, 22)])
+
+ for y in range(2012, 2014):
+ for m in range(1, 13):
+ # we're just diffing so we don't care about getting all days
+ for d in range(1, 28):
+ for h in range(0, 24):
+ x = datetime(y, m, d, h)
+ if not x in expect1:
+ l.append(datetime.strftime(x, lp))
+ lJustDates.append(x)
+ if not x in expect0:
+ r.append(datetime.strftime(x, rp))
+
+ result = diffDatewise(l, r, leftParse=lp, rightParse=rp)
+ self.assertEqual(result[0], expect0)
+ self.assertEqual(result[1], expect1)
+
+ result = diffDatewise(lJustDates, r, rightParse=rp)
+ self.assertEqual(result[0], expect0)
+ self.assertEqual(result[1], expect1)
+
+ def testTimestampsToNow(self):
+ now = datetime.now()
+ start = now - timedelta(hours=2)
+ expect = [
+ start,
+ start + timedelta(hours=1),
+ start + timedelta(hours=2),
+ ]
+ timestamps = timestampsToNow(start, timedelta(hours=1))
+ self.assertEqual(expect, list(timestamps))
diff --git a/kraken-etl/util.py b/kraken-etl/util.py
new file mode 100644
index 0000000..278bed4
--- /dev/null
+++ b/kraken-etl/util.py
@@ -0,0 +1,219 @@
+import logger
+from datetime import datetime
+
+
+logger = logging.getLogger('kraken-etl-util')
+interval_hierarchies = {
+ 'hourly': {
+ 'depth': 4,
+ 'directory_format': '%Y/%m/%d/%H',
+ 'hive_partition_format': 'year=%Y/month=%m/day=%d/hour=%H'
+ },
+ 'daily': {
+ 'depth': 3,
+ 'directory_format': '%Y/%m/%d',
+ 'hive_partition_format': 'year=%Y/month=%m/day=%d'
+ },
+ 'monthly': {
+ 'depth': 2,
+ 'directory_format': '%Y/%m'
+ 'hive_partition_format': 'year=%Y/month=%m'
+ },
+ 'yearly': {
+ 'depth': 1,
+ 'directory_format': '%Y'
+ 'hive_partition_format': 'year=%Y'
+ },
+}
+
+
+def diffDatewise(left, right, leftParse=None, rightParse=None):
+ """
+ Parameters
+ left : a list of datetime strings or objects
+ right : a list of datetime strings or objects
+ leftParse : None if left contains datetimes, or strptime format
+ rightParse : None if right contains datetimes, or strptime format
+
+ Returns
+ A tuple of two sets:
+ [0] : the datetime objects in left but not right
+ [1] : the datetime objects in right but not left
+ """
+
+ if leftParse:
+ leftSet = set([
+ datetime.strptime(l.strip(), leftParse)
+ for l in left if len(l.strip())
+ ])
+ else:
+ leftSet = set(left)
+
+ if rightParse:
+ rightSet = set([
+ datetime.strptime(r.strip(), rightParse)
+ for r in right if len(r.strip())
+ ])
+ else:
+ rightSet = set(right)
+
+ return (leftSet - rightSet, rightSet - leftSet)
+
+
+def timestampsToNow(start, increment):
+ """
+ Generates timestamps from @start to datetime.now(), by @increment
+
+ Parameters
+ start : the first generated timestamp
+ increment : the timedelta between the generated timestamps
+
+ Returns
+ A generator that goes from @start to datetime.now() - x,
+ where x <= @increment
+ """
+ now = datetime.now()
+ while start < now:
+ yield start
+ start += increment
+
+
+def sh(command):
+ """
+ Execute a shell command and return the result
+ """
+ logger.debug('Running: {0}'.format(command))
+ return os.popen(command).read().strip()
+
+
+def call(command, check_return_code=True):
+ """
+ Execute a shell command and return the result
+ """
+ logger.debug('Running: {0}'.format(' '.join(command)))
+ p = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
+ stdout, stderr = p.communicate()
+ if check_return_code and p.returncode != 0:
+ raise RuntimeError("Command: {0} failed with error code: {1}".format("
".join(command), p.returncode),
+ stdout, stderr)
+ return stdout.strip()
+
+
+class HiveUtils(object):
+ def __init__(self, database='default', options=''):
+ self.database = database
+ self.options = options.split()
+ self.hivecmd = None
+ # list of partitions for table, keyed by table
+ # self.table_partitions = {}
+ self.hivecmd = ['hive'] + self.options + ['cli', '--database',
self.database]
+ # initialize tables dict with table names
+ self.tables = {}
+ # cache results for later
+ for t in self.tables_get():
+ self.tables[t] = {}
+
+ def partitions(self, table):
+ """Returns a list of partitions for the given Hive table."""
+
+ # cache results for later
+ # if we don't know the partitions yet, get them now
+ if not 'partitions' in self.tables[table].keys():
+ stdout = self.query("SHOW PARTITIONS %s;" % table)
+ partitions = stdout.split('\n')
+ partitions.remove('partition')
+ self.tables[table]['partitions'] = partitions
+
+ return self.tables[table]['partitions']
+
+ def table_exists(self, table): # ,force=False
+ return table in self.tables.keys()
+
+ def tables_get(self):
+ t = self.query('SHOW TABLES').split('\n')
+ t.remove('tab_name')
+ return t
+
+ def partition_interval(self, table):
+ intervals = {
+ 4: 'hourly',
+ 3: 'daily',
+ 2: 'monhtly',
+ 1: 'yearly',
+ }
+
+ # cache results for later
+ if not 'interval' in self.tables[table].keys():
+ # counts the number of partition keys this table has
+ # and returns a string time interval based in this number
+ cmd = ' '.join(self.hivecmd) + ' -e \'SHOW CREATE TABLE ' + table
+ ';\' | sed -n \'/PARTITIONED BY (/,/)/p\' | grep -v \'PARTITIONED BY\' | wc
-l'
+ logger.debug('Running: %s' % cmd)
+ # using check_output directly here so that we can pass shell=True
and use pipes.
+ partition_depth = int(subprocess.check_output(cmd,
stderr=subprocess.PIPE, shell=True).strip())
+ self.tables[table]['depth'] = partition_depth
+ self.tables[table]['interval'] = intervals[partition_depth]
+
+ return self.tables[table]['interval']
+
+ def query(self, query, check_return_code=True):
+ """Runs the given hive query and returns stdout"""
+ return self.command(['-e', query], check_return_code)
+
+ def script(self, script, check_return_code=True):
+ """Runs the contents of the given script in hive and returns stdout"""
+ if not os.path.isfile(script):
+ raise RuntimeError("Hive script: {0} does not
exist.".format(script))
+ return self.command( ['-f', script], check_return_code)
+
+ def command(self, args, check_return_code=True):
+ """Runs the `hive` from the command line, passing in the given args,
and
+ returning stdout.
+ """
+ cmd = self.hivecmd + args
+ return call(cmd, check_return_code)
+
+
+class CamusUtils(object):
+ """
+ Deal with topics and partitions created by Camus
+ """
+ def __init__(self, destination_path):
+ self.destination_path = destination_path
+
+ def topics(self):
+ """Reads topic names out of destination_path"""
+ return sh(
+ 'hdfs dfs -ls {0}/ | '\
+ 'grep -v "Found .* items" | '\
+ 'awk -F "/" \'{{print $NF}}\''.format(
+ self.destination_path
+ )
+ ).split('\n')
+
+ def basedir(self, topic, interval):
+ return self.destination_path + '/' + topic + '/' + interval
+
+ # TODO configure interval partition paths automatically instead of
hardcoding
+ def partitions(self, topic, interval='hourly'):
+ """
+ Returns a list of time bucketd 'partitions' created by Camus import.
+ These are inferred from directories in hdfs.
+ """
+
+ basedir = self.basedir(topic, interval)
+
+ depth = interval_hierarchies[interval]['depth']
+ # number of wildcard time bucket directories, e.g. depth_stars == 4
-> '/*/*/*/*'
+ depth_stars = '/*' * depth
+ directories = sh(
+ 'hdfs dfs -ls -d {0}{1} | '\
+ 'grep -v \'Found .* items\' | '\
+ 'awk \'{{print $NF}}\''.format(basedir, depth_stars)
+ ).split('\n')
+ # return list of time bucket directories, e.g. ['2013/10/09/15',
'2013/10/09/16', ... ]
+ return [directory.replace(basedir + '/', '') for directory in
directories]
+
+ # return sh('hdfs dfs -ls -R {0}/{1}/{2} | sed
"s@.*{0}/{1}/{2}/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/\([0-9]*\)/.*@year=\1,
month=\2, day=\3, hour=\4@" | grep "year.*" | sort |
uniq'.format(self.destination_path, topic, interval)).split()
+ # return a list of lists of time buckets, e.g. [['2013']]
+ # return [directory.split('/')[-depth:] for directory in directories]
+ # return [(int(d) for d in directory.split('/')[-depth:]) for
directory in directories]
--
To view, visit https://gerrit.wikimedia.org/r/89125
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I608d9cd78355c9b2845ed379b1a66975a538dc06
Gerrit-PatchSet: 2
Gerrit-Project: analytics/kraken
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>
Gerrit-Reviewer: Milimetric <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits