Milimetric has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/89125


Change subject: Adding import logic for dumps pagecounts
......................................................................

Adding import logic for dumps pagecounts

Change-Id: I608d9cd78355c9b2845ed379b1a66975a538dc06
Card: analytics 1195.1
---
M .gitignore
A kraken-etl/pagecounts/import-one-hour.sh
A kraken-etl/pagecounts/import.py
A kraken-etl/test.py
A kraken-etl/util.py
5 files changed, 178 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/kraken 
refs/changes/25/89125/1

diff --git a/.gitignore b/.gitignore
index 4404079..fe03682 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,5 @@
 tmp/
 *.swp
 .deploy
+
+*.pyc
diff --git a/kraken-etl/pagecounts/import-one-hour.sh 
b/kraken-etl/pagecounts/import-one-hour.sh
new file mode 100644
index 0000000..ff8fd1d
--- /dev/null
+++ b/kraken-etl/pagecounts/import-one-hour.sh
@@ -0,0 +1,43 @@
+#!/bin/bash
+#
+# This script does the following:
+#       0. reads four arguments from the CLI, in order, as YEAR, MONTH, DAY, 
HOUR
+#       1. downloads the specified hour worth of data from 
http://dumps.wikimedia.org/other/pagecounts-raw/
+#       2. extracts the data into hdfs
+#       3. creates a partition on a hive table pointing to this data
+#
+
+print_help() {
+        cat <<EOF
+        
+USAGE: import_one_hour.sh YEAR MONTH DAY HOUR
+        
+  Imports one hour worth of data from dumps.wikimedia.org/other/pagecounts-raw 
into a Hive table
+
+EOF
+}
+
+if [ $# -ne 4 ]
+then
+        print_help
+        exit
+fi
+
+YEAR=$1
+MONTH=$2
+DAY=$3
+HOUR=$4
+
+# Someone intelligent should set these
+LOCAL_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz
+HDFS_DIR=/user/milimetric/pagecounts/$YEAR.$MONTH.${DAY}_${HOUR}.00.00
+HDFS_FILE=pagecounts-$YEAR$MONTH$DAY-${HOUR}0000
+TABLE=milimetric_pagecounts
+
+rm $LOCAL_FILE
+wget 
http://dumps.wikimedia.org/other/pagecounts-raw/$YEAR/$YEAR-$MONTH/pagecounts-$YEAR$MONTH$DAY-${HOUR}0000.gz
+hdfs dfs -rm -r $HDFS_DIR
+gunzip -c $LOCAL_FILE | hdfs dfs -put - $HDFS_DIR/$HDFS_FILE
+rm $LOCAL_FILE
+hive -e "ALTER TABLE ${TABLE} DROP PARTITION 
(year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR');"
+hive -e "ALTER TABLE ${TABLE} ADD PARTITION 
(year='$YEAR',month='$MONTH',day='$DAY',hour='$HOUR') location '$HDFS_DIR';"
diff --git a/kraken-etl/pagecounts/import.py b/kraken-etl/pagecounts/import.py
new file mode 100644
index 0000000..c8fd8ae
--- /dev/null
+++ b/kraken-etl/pagecounts/import.py
@@ -0,0 +1,35 @@
+from datetime import datetime, timedelta
+from util import (
+    HdfsFileCollection, diffDatewise, timestampsToNow, shell
+)
+
+target = HdfsFileCollection(basedir='hdfs://wmf/pagecounts/hourly')
+
+hoursImported = target.getExistingHours() or [datetime.today()]
+hoursAvailable = timestampsToNow(min(hoursImported), timedelta(hours=1))
+
+hoursMissing = diffDatewise(
+    hoursImported,
+    hoursAvailable,
+    leftParse=target.basedir + '%Y/%m/%d/%H',
+)
+
+for missing in hoursMissing:
+    print('*************************')
+    print('Importing {0} '.format(missing))
+    
+    try:
+        result, err = shell([
+            'import-one-hour',
+            missing.year,
+            missing.month,
+            missing.day,
+            missing.hour,
+        ])
+        if err:
+            print(err)
+    except Exception as e:
+        print(e)
+    
+    print('Done Importing {0} '.format(missing))
+    print('*************************')
diff --git a/kraken-etl/test.py b/kraken-etl/test.py
new file mode 100644
index 0000000..69a7bf1
--- /dev/null
+++ b/kraken-etl/test.py
@@ -0,0 +1,46 @@
+from datetime import datetime, timedelta
+from util import diffDatewise, timestampsToNow
+from unittest import TestCase
+
+
+class TestUtil(TestCase):
+    def testDiffDatewise(self):
+        l = []
+        lJustDates = []
+        r = []
+        lp = 'blah%Y...%m...%d...%Hblahblah'
+        rp = 'neenee%Y%m%d%Hneenee'
+        
+        expect0 = set([datetime(2012, 6, 14, 13), datetime(2012, 11, 9, 3)])
+        expect1 = set([datetime(2012, 6, 14, 14), datetime(2013, 11, 10, 22)])
+        
+        for y in range(2012, 2014):
+            for m in range(1, 13):
+                # we're just diffing so we don't care about getting all days
+                for d in range(1, 28):
+                    for h in range(0, 24):
+                        x = datetime(y, m, d, h)
+                        if not x in expect1:
+                            l.append(datetime.strftime(x, lp))
+                            lJustDates.append(x)
+                        if not x in expect0:
+                            r.append(datetime.strftime(x, rp))
+        
+        result = diffDatewise(l, r, leftParse=lp, rightParse=rp)
+        self.assertEqual(result[0], expect0)
+        self.assertEqual(result[1], expect1)
+        
+        result = diffDatewise(lJustDates, r, rightParse=rp)
+        self.assertEqual(result[0], expect0)
+        self.assertEqual(result[1], expect1)
+    
+    def testTimestampsToNow(self):
+        now = datetime.now()
+        start = now - timedelta(hours=2)
+        expect = [
+            start,
+            start + timedelta(hours=1),
+            start + timedelta(hours=2),
+        ]
+        timestamps = timestampsToNow(start, timedelta(hours=1))
+        self.assertEqual(expect, list(timestamps))
diff --git a/kraken-etl/util.py b/kraken-etl/util.py
new file mode 100644
index 0000000..2256f47
--- /dev/null
+++ b/kraken-etl/util.py
@@ -0,0 +1,52 @@
+from datetime import datetime
+
+
+def diffDatewise(left, right, leftParse=None, rightParse=None):
+    """
+    Parameters
+        left        : a list of datetime strings or objects
+        right       : a list of datetime strings or objects
+        leftParse   : None if left contains datetimes, or strptime format
+        rightParse  : None if right contains datetimes, or strptime format
+
+    Returns
+        A tuple of two sets:
+        [0] : the datetime objects in left but not right
+        [1] : the datetime objects in right but not left
+    """
+    
+    if leftParse:
+        leftSet = set([
+            datetime.strptime(l.strip(), leftParse)
+            for l in left if len(l.strip())
+        ])
+    else:
+        leftSet = set(left)
+    
+    if rightParse:
+        rightSet = set([
+            datetime.strptime(r.strip(), rightParse)
+            for r in right if len(r.strip())
+        ])
+    else:
+        rightSet = set(right)
+    
+    return (leftSet - rightSet, rightSet - leftSet)
+
+
+def timestampsToNow(start, increment):
+    """
+    Generates timestamps from @start to datetime.now(), by @increment
+    
+    Parameters
+        start       : the first generated timestamp
+        increment   : the timedelta between the generated timestamps
+    
+    Returns
+        A generator that goes from @start to datetime.now() - x,
+        where x <= @increment
+    """
+    now = datetime.now()
+    while start < now:
+        yield start
+        start += increment

-- 
To view, visit https://gerrit.wikimedia.org/r/89125
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I608d9cd78355c9b2845ed379b1a66975a538dc06
Gerrit-PatchSet: 1
Gerrit-Project: analytics/kraken
Gerrit-Branch: master
Gerrit-Owner: Milimetric <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to