Milimetric has uploaded a new change for review. https://gerrit.wikimedia.org/r/89125
Change subject: Adding import logic for dumps pagecounts ...................................................................... Adding import logic for dumps pagecounts Change-Id: I608d9cd78355c9b2845ed379b1a66975a538dc06 Card: analytics 1195.1 --- M .gitignore A kraken-etl/pagecounts/import-one-hour.sh A kraken-etl/pagecounts/import.py A kraken-etl/test.py A kraken-etl/util.py 5 files changed, 178 insertions(+), 0 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/kraken refs/changes/25/89125/1 diff --git a/.gitignore b/.gitignore index 4404079..fe03682 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,5 @@ tmp/ *.swp .deploy + +*.pyc diff --git a/kraken-etl/pagecounts/import-one-hour.sh b/kraken-etl/pagecounts/import-one-hour.sh new file mode 100644 index 0000000..ff8fd1d --- /dev/null +++ b/kraken-etl/pagecounts/import-one-hour.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# +# This script does the following: +# 0. reads four arguments from the CLI, in order, as YEAR, MONTH, DAY, HOUR +# 1. downloads the specified hour worth of data from http://dumps.wikimedia.org/other/pagecounts-raw/ +# 2. extracts the data into hdfs +# 3. creates a partition on a hive table pointing to this data +# + +print_help() { + cat <<EOF + +USAGE: import_one_hour.sh YEAR MONTH DAY HOUR + + Imports one hour worth of data from dumps.wikimedia.org/other/pagecounts-raw into a Hive table + +EOF +} + +if [ $# -ne 4 ] +then + print_help + exit +fi + +YEAR=$1 +MONTH=$2 +DAY=$3 +HOUR=$4 + +# Someone intelligent should set these +LOCAL_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz +HDFS_DIR=/user/milimetric/pagecounts/$YEAR.$MONTH.${DAY}_${HOUR}.00.00 +HDFS_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000 +TABLE=milimetric_pagecounts + +rm $LOCAL_FILE +wget http://dumps.wikimedia.org/other/pagecounts-raw/$YEAR/$YEAR-$MONTH/pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz +hdfs dfs -rm -r $HDFS_DIR +gunzip -c $LOCAL_FILE | hdfs dfs -put - $HDFS_DIR/$HDFS_FILE +rm $LOCAL_FILE +hive -e "ALTER TABLE ${TABLE} DROP PARTITION (year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR');" +hive -e "ALTER TABLE ${TABLE} ADD PARTITION (year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR') location '$HDFS_DIR';" diff --git a/kraken-etl/pagecounts/import.py b/kraken-etl/pagecounts/import.py new file mode 100644 index 0000000..c8fd8ae --- /dev/null +++ b/kraken-etl/pagecounts/import.py @@ -0,0 +1,35 @@ +from datetime import datetime, timedelta +from util import ( + HdfsFileCollection, diffDatewise, timestampsToNow, shell +) + +target = HdfsFileCollection(basedir='hdfs://wmf/pagecounts/hourly') + +hoursImported = target.getExistingHours() or [datetime.today()] +hoursAvailable = timestampsToNow(min(hoursImported), timedelta(hours=1)) + +hoursMissing = diffDatewise( + hoursImported, + hoursAvailable, + leftParse=target.basedir + '%Y/%m/%d/%H', +) + +for missing in hoursMissing: + print('*************************') + print('Importing {0} '.format(missing)) + + try: + result, err = shell([ + 'import-one-hour', + missing.year, + missing.month, + missing.day, + missing.hour, + ]) + if err: + print(err) + except Exception as e: + print(e) + + print('Done Importing {0} '.format(missing)) + print('*************************') diff --git a/kraken-etl/test.py b/kraken-etl/test.py new file mode 100644 index 0000000..69a7bf1 --- /dev/null +++ b/kraken-etl/test.py @@ -0,0 +1,46 @@ +from datetime import datetime, timedelta +from util import diffDatewise, timestampsToNow +from unittest import TestCase + + +class TestUtil(TestCase): + def testDiffDatewise(self): + l = [] + lJustDates = [] + r = [] + lp = 'blah%Y...%m...%d...%Hblahblah' + rp = 'neenee%Y%m%d%Hneenee' + + expect0 = set([datetime(2012, 6, 14, 13), datetime(2012, 11, 9, 3)]) + expect1 = set([datetime(2012, 6, 14, 14), datetime(2013, 11, 10, 22)]) + + for y in range(2012, 2014): + for m in range(1, 13): + # we're just diffing so we don't care about getting all days + for d in range(1, 28): + for h in range(0, 24): + x = datetime(y, m, d, h) + if not x in expect1: + l.append(datetime.strftime(x, lp)) + lJustDates.append(x) + if not x in expect0: + r.append(datetime.strftime(x, rp)) + + result = diffDatewise(l, r, leftParse=lp, rightParse=rp) + self.assertEqual(result[0], expect0) + self.assertEqual(result[1], expect1) + + result = diffDatewise(lJustDates, r, rightParse=rp) + self.assertEqual(result[0], expect0) + self.assertEqual(result[1], expect1) + + def testTimestampsToNow(self): + now = datetime.now() + start = now - timedelta(hours=2) + expect = [ + start, + start + timedelta(hours=1), + start + timedelta(hours=2), + ] + timestamps = timestampsToNow(start, timedelta(hours=1)) + self.assertEqual(expect, list(timestamps)) diff --git a/kraken-etl/util.py b/kraken-etl/util.py new file mode 100644 index 0000000..2256f47 --- /dev/null +++ b/kraken-etl/util.py @@ -0,0 +1,52 @@ +from datetime import datetime + + +def diffDatewise(left, right, leftParse=None, rightParse=None): + """ + Parameters + left : a list of datetime strings or objects + right : a list of datetime strings or objects + leftParse : None if left contains datetimes, or strptime format + rightParse : None if right contains datetimes, or strptime format + + Returns + A tuple of two sets: + [0] : the datetime objects in left but not right + [1] : the datetime objects in right but not left + """ + + if leftParse: + leftSet = set([ + datetime.strptime(l.strip(), leftParse) + for l in left if len(l.strip()) + ]) + else: + leftSet = set(left) + + if rightParse: + rightSet = set([ + datetime.strptime(r.strip(), rightParse) + for r in right if len(r.strip()) + ]) + else: + rightSet = set(right) + + return (leftSet - rightSet, rightSet - leftSet) + + +def timestampsToNow(start, increment): + """ + Generates timestamps from @start to datetime.now(), by @increment + + Parameters + start : the first generated timestamp + increment : the timedelta between the generated timestamps + + Returns + A generator that goes from @start to datetime.now() - x, + where x <= @increment + """ + now = datetime.now() + while start < now: + yield start + start += increment -- To view, visit https://gerrit.wikimedia.org/r/89125 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I608d9cd78355c9b2845ed379b1a66975a538dc06 Gerrit-PatchSet: 1 Gerrit-Project: analytics/kraken Gerrit-Branch: master Gerrit-Owner: Milimetric <[email protected]> _______________________________________________ MediaWiki-commits mailing list [email protected] https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits
