Mforns has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/281815

Change subject: Add max_data_points option
......................................................................

Add max_data_points option

This feature allows to specify a maximum size for the report using
the config option 'max_data_points=N', where N is an integer >= 1.
So, if N=5, the report will only hold the last 5 data points.
Note that this depends on the granularity, N=5 means last 5 weeks
for weekly granularity, and last 5 days for daily granularity, etc.

Bug: T131849
Change-Id: I99683d3ac3c54361d75d6fe4a1c72f3070312d30
---
M reportupdater/reader.py
M reportupdater/report.py
M reportupdater/selector.py
M reportupdater/utils.py
M reportupdater/writer.py
M test/reader_test.py
M test/selector_test.py
M test/writer_test.py
8 files changed, 144 insertions(+), 74 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/reportupdater 
refs/changes/15/281815/1

diff --git a/reportupdater/reader.py b/reportupdater/reader.py
index 1fb0a2c..3f4a52f 100644
--- a/reportupdater/reader.py
+++ b/reportupdater/reader.py
@@ -61,6 +61,7 @@
         report.is_funnel = self.get_is_funnel(report_config)
         report.first_date = self.get_first_date(report_config)
         report.explode_by = self.get_explode_by(report_config)
+        report.max_data_points = self.get_max_data_points(report_config)
         executable = self.get_executable(report_config) or report_key
         if report.type == 'sql':
             report.db_key = self.get_db_key(report_config)
@@ -154,6 +155,15 @@
         return explode_by
 
 
+    def get_max_data_points(self, report_config):
+        if 'max_data_points' not in report_config:
+            return None
+        max_data_points = report_config['max_data_points']
+        if type(max_data_points) != int or max_data_points < 1:
+            raise ValueError('Max data points is not valid.')
+        return max_data_points
+
+
     def get_executable(self, report_config):
         if 'execute' not in report_config:
             return None
diff --git a/reportupdater/report.py b/reportupdater/report.py
index c58c4ea..04c353e 100644
--- a/reportupdater/report.py
+++ b/reportupdater/report.py
@@ -27,6 +27,7 @@
         self.sql_template = None
         self.script = None
         self.explode_by = {}
+        self.max_data_points = None
         self.results = {'header': [], 'data': {}}
 
 
@@ -45,6 +46,7 @@
             ' sql_template=' + self.format_sql(self.sql_template) +
             ' script=' + str(self.script) +
             ' explode_by=' + str(self.explode_by) +
+            ' max_data_points=' + str(self.max_data_points) +
             ' results=' + self.format_results(self.results) +
             '>'
         )
diff --git a/reportupdater/selector.py b/reportupdater/selector.py
index c402b1f..94e690e 100644
--- a/reportupdater/selector.py
+++ b/reportupdater/selector.py
@@ -14,7 +14,7 @@
 from datetime import datetime
 from dateutil.relativedelta import relativedelta
 from reader import Reader
-from utils import raise_critical, get_previous_results
+from utils import raise_critical, get_previous_results, get_increment
 
 
 class Selector(object):
@@ -57,7 +57,7 @@
 
         first_date = self.truncate_date(report.first_date, report.granularity)
         lag_increment = relativedelta(seconds=report.lag)
-        granularity_increment = self.get_increment(report.granularity)
+        granularity_increment = get_increment(report.granularity)
         relative_now = now - lag_increment - granularity_increment
         last_date = self.truncate_date(relative_now, report.granularity)
         previous_results = get_previous_results(report, output_folder)
@@ -83,19 +83,6 @@
             return self.truncate_date(date, 'days') - passed_weekdays
         elif period == 'months':
             return date.replace(day=1, hour=0, minute=0, second=0, 
microsecond=0)
-        else:
-            raise ValueError('Period is not valid.')
-
-
-    def get_increment(self, period):
-        if period == 'hours':
-            return relativedelta(hours=1)
-        elif period == 'days':
-            return relativedelta(days=1)
-        elif period == 'weeks':
-            return relativedelta(days=7)
-        elif period == 'months':
-            return relativedelta(months=1)
         else:
             raise ValueError('Period is not valid.')
 
diff --git a/reportupdater/utils.py b/reportupdater/utils.py
index 75ddf8a..21cdcec 100644
--- a/reportupdater/utils.py
+++ b/reportupdater/utils.py
@@ -9,6 +9,7 @@
 import logging
 from datetime import datetime
 from collections import defaultdict
+from dateutil.relativedelta import relativedelta
 
 
 DATE_FORMAT = '%Y-%m-%d'
@@ -94,3 +95,16 @@
 def ensure_dir(dir_path):
     if not os.path.exists(dir_path):
         os.makedirs(dir_path)
+
+
+def get_increment(period):
+    if period == 'hours':
+        return relativedelta(hours=1)
+    elif period == 'days':
+        return relativedelta(days=1)
+    elif period == 'weeks':
+        return relativedelta(days=7)
+    elif period == 'months':
+        return relativedelta(months=1)
+    else:
+        raise ValueError('Period is not valid.')
diff --git a/reportupdater/writer.py b/reportupdater/writer.py
index 00dc290..731d7a5 100644
--- a/reportupdater/writer.py
+++ b/reportupdater/writer.py
@@ -8,9 +8,11 @@
 import io
 import csv
 import logging
+from copy import copy, deepcopy
 from executor import Executor
 from utils import (raise_critical, get_previous_results,
-                   DATE_FORMAT, get_exploded_report_output_path)
+                   DATE_FORMAT, get_exploded_report_output_path,
+                   get_increment)
 
 
 class Writer(object):
@@ -51,73 +53,66 @@
 
 
     def update_results(self, report):
-        header = report.results['header']
+        # Get current results.
+        current_header = copy(report.results['header'])
+        current_data = deepcopy(report.results['data'])
+        for date in current_data:
+            rows = current_data[date] if report.is_funnel else 
[current_data[date]]
+            for row in rows:
+                if len(row) != len(current_header):
+                    raise ValueError('Results and header do not match.')
+
+        # Get previous results.
         previous_results = get_previous_results(report, 
self.get_output_folder())
         previous_header = previous_results['header']
-
-        updated_data = {}
-
-        # Handle the first run case
+        previous_data = previous_results['data']
         if not previous_header:
-            if not previous_results['data']:
-                previous_header = header
+            if not previous_data:
+                previous_header = current_header
             else:
-                raise ValueError('Previous results have no header')
+                raise ValueError('Previous results have no header.')
 
-        # New results may have a different header than previous results.
+        # Current results may have a different header than previous results.
         # They may contain new columns, column order changes, or removal
-        # of some columns. In the latter case, the previous data will be
-        # kept intact and the None value will be assigned to the missing
-        # columns of the new data.
-        if header != previous_header:
-            # Fill in the values for removed columns.
-            removed_columns = sorted(list(set(previous_header) - set(header)))
+        # of some columns.
+        if current_header != previous_header:
+
+            # Rewrite current header and data to include removed columns.
+            removed_columns = sorted(list(set(previous_header) - 
set(current_header)))
             if removed_columns:
-                header.extend(removed_columns)
-                new_data = report.results['data']
-                for date in new_data:
-                    rows = new_data[date] if report.is_funnel else 
[new_data[date]]
+                current_header.extend(removed_columns)
+                for date in current_data:
+                    rows = current_data[date] if report.is_funnel else 
[current_data[date]]
                     for row in rows:
                         row.extend([None] * len(removed_columns))
 
-            # make a map to use when updating old rows to new rows
-            old_columns = set(header).intersection(set(previous_header))
-            new_indexes = {
-                header.index(col): previous_header.index(col)
-                for col in old_columns
-            }
+            # Make a map to use when updating previous data column order.
+            column_map = [
+                (current_header.index(col), previous_header.index(col))
+                for col in 
set(current_header).intersection(set(previous_header))
+            ]
 
-            # rewrite previous results if there are new columns
-            for date, rows in previous_results['data'].items():
-                rows_with_nulls = []
-                iteratee = rows
-                if not report.is_funnel:
-                    iteratee = [rows]
-                for row in iteratee:
-                    updated_row = [None] * len(header)
-                    for new_index, old_index in new_indexes.items():
-                        updated_row[new_index] = row[old_index]
-
-                    if report.is_funnel:
-                        rows_with_nulls.append(updated_row)
-                    else:
-                        rows_with_nulls = updated_row
-
-                updated_data[date] = rows_with_nulls
-        else:
-            updated_data = previous_results['data']
-
-        for date, rows in report.results['data'].iteritems():
-            updated_data[date] = rows
-            if report.is_funnel:
+            # Rewrite previous data in the new order and including new columns.
+            for date in previous_data:
+                rows = previous_data[date] if report.is_funnel else 
[previous_data[date]]
+                rewritten_rows = []
                 for row in rows:
-                    if len(row) != len(header):
-                        raise ValueError('Results and Header do not match')
-            else:
-                if len(rows) != len(header):
-                    raise ValueError('Results and Header do not match')
+                    rewritten_row = [None] * len(current_header)
+                    for new_index, old_index in column_map:
+                        rewritten_row[new_index] = row[old_index]
+                    rewritten_rows.append(rewritten_row)
+                previous_data[date] = rewritten_rows if report.is_funnel else 
rewritten_rows[0]
 
-        return header, updated_data
+        # Build final updated data.
+        updated_header = current_header
+        updated_data = {}
+        date_threshold = self.get_date_threshold(report)
+        for date in previous_data:
+            if not date_threshold or date > date_threshold:
+                updated_data[date] = previous_data[date]
+        updated_data.update(current_data)
+
+        return updated_header, updated_data
 
 
     def write_results(self, header, data, report, output_folder):
@@ -151,3 +146,10 @@
             os.rename(temp_output_path, output_path)
         except Exception, e:
             raise RuntimeError('Could not rename the output file (' + str(e) + 
').')
+
+
+    def get_date_threshold(self, report):
+        if not report.max_data_points:
+            return None
+        increment = get_increment(report.granularity)
+        return report.start - report.max_data_points * increment
diff --git a/test/reader_test.py b/test/reader_test.py
index 10a5036..102af69 100644
--- a/test/reader_test.py
+++ b/test/reader_test.py
@@ -247,6 +247,27 @@
         self.assertEqual(result, expected)
 
 
+    def test_get_max_data_points_when_not_in_config(self):
+        result = self.reader.get_max_data_points({})
+        self.assertEqual(result, None)
+
+
+    def test_get_max_data_points_not_an_int_or_not_positive(self):
+        report_config = {'max_data_points': 'not and int'}
+        with self.assertRaises(ValueError):
+            self.reader.get_max_data_points(report_config)
+        report_config = {'max_data_points': 0}
+        with self.assertRaises(ValueError):
+            self.reader.get_max_data_points(report_config)
+
+
+    def test_get_max_data_points(self):
+        max_data_points = 10
+        report_config = {'max_data_points': max_data_points}
+        result = self.reader.get_max_data_points(report_config)
+        self.assertEqual(result, max_data_points)
+
+
     def test_get_executable_when_not_in_config(self):
         result = self.reader.get_executable({})
         self.assertEqual(result, None)
diff --git a/test/selector_test.py b/test/selector_test.py
index eb41d0e..0232703 100644
--- a/test/selector_test.py
+++ b/test/selector_test.py
@@ -3,6 +3,7 @@
 from reportupdater.selector import Selector
 from reportupdater.reader import Reader
 from reportupdater.report import Report
+from reportupdater.utils import get_increment
 from unittest import TestCase
 from mock import MagicMock
 from datetime import datetime
@@ -101,23 +102,23 @@
 
 
     def test_get_increment_when_period_is_days(self):
-        increment = self.selector.get_increment('days')
+        increment = get_increment('days')
         self.assertEqual(increment, relativedelta(days=1))
 
 
     def test_get_increment_when_period_is_weeks(self):
-        increment = self.selector.get_increment('weeks')
+        increment = get_increment('weeks')
         self.assertEqual(increment, relativedelta(days=7))
 
 
     def test_get_increment_when_period_is_months(self):
-        increment = self.selector.get_increment('months')
+        increment = get_increment('months')
         self.assertEqual(increment, relativedelta(months=1))
 
 
     def test_get_increment_when_period_is_invalid(self):
         with self.assertRaises(ValueError):
-            self.selector.get_increment('notvalid')
+            get_increment('notvalid')
 
 
     def 
test_get_all_start_dates_when_first_date_is_greater_than_current_date(self):
diff --git a/test/writer_test.py b/test/writer_test.py
index 87fc02e..449156c 100644
--- a/test/writer_test.py
+++ b/test/writer_test.py
@@ -262,6 +262,39 @@
         self.assertEqual(updated_data[old_date], [old_date, '2', None, '1', 
'3', None])
 
 
+    def test_update_results_when_max_data_points_is_set(self):
+        # see setUp for the fake data written to this report output
+        self.report.key = 'writer_test_header_change'
+
+        new_date = datetime(2015, 1, 2)
+        new_row = [new_date, 1, 2, 3]
+        self.report.max_data_points = 1
+        self.report.granularity = 'days'
+        self.report.start = new_date
+        self.report.results = {
+            'header': ['date', 'val1', 'val2', 'val3'],  # no changes
+            'data': {new_date: new_row}
+        }
+        header, updated_data = self.writer.update_results(self.report)
+        self.assertEqual(len(updated_data), 1)
+        self.assertTrue(new_date in updated_data)
+        self.assertEqual(updated_data[new_date], new_row)
+
+
+    def test_get_date_threshold_when_max_data_points_is_not_specified(self):
+        date_threshold = self.writer.get_date_threshold(self.report)
+        self.assertEqual(date_threshold, None)
+
+
+    def test_get_date_threshold(self):
+        self.report.max_data_points = 3
+        self.report.start = datetime(2015, 1, 1)
+        self.report.granularity = 'days'
+        expected = datetime(2014, 12, 29)
+        result = self.writer.get_date_threshold(self.report)
+        self.assertEqual(result, expected)
+
+
     def test_run_when_helper_method_raises_error(self):
         executed = [self.report]
         self.writer.executor.run = MagicMock(return_value=executed)

-- 
To view, visit https://gerrit.wikimedia.org/r/281815
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I99683d3ac3c54361d75d6fe4a1c72f3070312d30
Gerrit-PatchSet: 1
Gerrit-Project: analytics/reportupdater
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to