Mforns has submitted this change and it was merged.

Change subject: Add re-run script
......................................................................


Add re-run script

This script allows marking one or more reports to be re-run for a
given date range. The re-run will take place the next time report-
updater runs.

To exectue unit tests and flake8: tox

To test it manually, for example:
1) Follow the test setup in test/locally/README.md
2) Run update_reports (kill it after getting some results).
3) Open a generated report and change the value for a date.
4) Run rerun_reports and pass the report and date you changed.
5) Run update_reports (kill it after getting some results).
6) Check that the report you changed has been corrected.

Bug: T117538
Change-Id: I5c0ce2ea3bbcc9ca86cfa89128e151ba084d9185
---
M reportupdater/reportupdater.py
M reportupdater/selector.py
M reportupdater/utils.py
M reportupdater/writer.py
A rerun_reports.py
A test/fixtures/config/reportupdater_test6.yaml
A test/fixtures/output/selector_test3.tsv
A test/fixtures/queries/reportupdater_test6.sql
M test/locally/README.md
M test/locally/config_example.yaml
M test/locally/ssh
M test/reportupdater_test.py
M test/selector_test.py
M test/writer_test.py
14 files changed, 346 insertions(+), 26 deletions(-)

Approvals:
  Mforns: Verified; Looks good to me, approved



diff --git a/reportupdater/reportupdater.py b/reportupdater/reportupdater.py
index e11863c..d03ab94 100644
--- a/reportupdater/reportupdater.py
+++ b/reportupdater/reportupdater.py
@@ -25,7 +25,7 @@
 from selector import Selector
 from executor import Executor
 from writer import Writer
-from utils import DATE_AND_TIME_FORMAT
+from utils import DATE_AND_TIME_FORMAT, DATE_FORMAT
 
 
 def run(**kwargs):
@@ -42,6 +42,7 @@
         config['current_exec_time'] = current_exec_time
         config['query_folder'] = params['query_folder']
         config['output_folder'] = params['output_folder']
+        config['reruns'], rerun_files = read_reruns(params['query_folder'])
 
         reader = Reader(config)
         selector = Selector(reader, config)
@@ -49,6 +50,7 @@
         writer = Writer(executor, config)
         writer.run()
 
+        delete_reruns(rerun_files)  # delete rerun files that have been 
processed
         delete_pid_file(params)  # free lock for other instances to execute
         logging.info('Execution complete.')
     else:
@@ -144,5 +146,49 @@
         raise IOError('Can not read the config file because of: (' + str(e) + 
').')
 
 
+def read_reruns(query_folder):
+    reruns_folder = os.path.join(query_folder, '.reruns')
+    if os.path.isdir(reruns_folder):
+        try:
+            rerun_candidates = os.listdir(reruns_folder)
+        except IOError, e:
+            raise IOError('Can not read rerun folder because of: (' + str(e) + 
').')
+        rerun_config, rerun_files = {}, []
+        for rerun_candidate in rerun_candidates:
+            rerun_path = os.path.join(reruns_folder, rerun_candidate)
+            try:
+                # Use r+ mode (read and write) to force an error
+                # if the file is still being written.
+                with io.open(rerun_path, 'r+', encoding='utf-8') as rerun_file:
+                    reruns = rerun_file.readlines()
+                parse_reruns(reruns, rerun_config)
+                rerun_files.append(rerun_path)
+            except:
+                logging.warning(
+                    'Rerun file %s could not be parsed and will be ignored.' % 
rerun_path
+                )
+        return (rerun_config, rerun_files)
+    else:
+        return ({}, [])
+
+
+def parse_reruns(lines, rerun_config):
+    values = [l.strip() for l in lines]
+    start_date = datetime.strptime(values[0], DATE_FORMAT)
+    end_date = datetime.strptime(values[1], DATE_FORMAT)
+    for report in values[2:]:
+        if report not in rerun_config:
+            rerun_config[report] = []
+        rerun_config[report].append((start_date, end_date))
+
+
+def delete_reruns(rerun_files):
+    for rerun_file in rerun_files:
+        try:
+            os.remove(rerun_file)
+        except IOError:
+            logging.warning('Rerun file %s could not be deleted.' % rerun_file)
+
+
 def utcnow():
     return datetime.utcnow()
diff --git a/reportupdater/selector.py b/reportupdater/selector.py
index fadabd9..d0ccbd1 100644
--- a/reportupdater/selector.py
+++ b/reportupdater/selector.py
@@ -63,7 +63,8 @@
         if report.max_data_points:
             jump_back = get_increment(report.granularity, 
report.max_data_points - 1)
             first_date = max(first_date, last_date - jump_back)
-        previous_results = get_previous_results(report, output_folder)
+        previous_results = get_previous_results(
+            report, output_folder, self.config['reruns'])
         already_done_dates = previous_results['data'].keys()
 
         for start in self.get_all_start_dates(first_date, last_date, 
granularity_increment):
diff --git a/reportupdater/utils.py b/reportupdater/utils.py
index d894717..46757b6 100644
--- a/reportupdater/utils.py
+++ b/reportupdater/utils.py
@@ -22,7 +22,7 @@
     raise error_class(message)
 
 
-def get_previous_results(report, output_folder):
+def get_previous_results(report, output_folder, reruns):
     # Reads a report file to get its results
     # and returns them in the expected dict(date->row) format.
     previous_results = {'header': [], 'data': {}}
@@ -54,6 +54,8 @@
                     date = datetime.strptime(row[0], DATE_FORMAT)
                 except ValueError:
                     raise ValueError('Output file date does not match date 
format.')
+                if needs_rerun(date, reruns.get(report.key, None)):
+                    continue  # Do not list this date so that it is re-run.
                 row[0] = date
                 if report.is_funnel:
                     data[date].append(row)
@@ -64,6 +66,15 @@
     return previous_results
 
 
+def needs_rerun(date, rerun_intervals):
+    if rerun_intervals is None:
+        return False
+    for start, end in rerun_intervals:
+        if date >= start and date < end:
+            return True
+    return False
+
+
 def get_exploded_report_output_path(output_folder, explode_by, report_key):
     output_folder = os.path.join(output_folder, report_key)
     placeholders = sorted(explode_by.keys())
diff --git a/reportupdater/writer.py b/reportupdater/writer.py
index 2af0595..017515b 100644
--- a/reportupdater/writer.py
+++ b/reportupdater/writer.py
@@ -62,8 +62,8 @@
                 if len(row) != len(current_header):
                     raise ValueError('Results and header do not match.')
 
-        # Get previous results.
-        previous_results = get_previous_results(report, 
self.get_output_folder())
+        # Get previous results (no need to pass the reruns, they will be 
overwritten).
+        previous_results = get_previous_results(report, 
self.get_output_folder(), {})
         previous_header = previous_results['header']
         previous_data = previous_results['data']
         if not previous_header:
diff --git a/rerun_reports.py b/rerun_reports.py
new file mode 100755
index 0000000..210b680
--- /dev/null
+++ b/rerun_reports.py
@@ -0,0 +1,140 @@
+#!/usr/bin/python
+
+import os
+import io
+import sys
+import time
+import yaml
+import logging
+import argparse
+from reportupdater import reportupdater
+from datetime import datetime
+
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+def parse_arguments():
+    parser = argparse.ArgumentParser(
+        description=('Mark reports to be re-run for a given date range.')
+    )
+    parser.add_argument(
+        'query_folder',
+        help='Folder with *.sql files and scripts.'
+    )
+    parser.add_argument(
+        'start_date',
+        help='Start of the date range to be rerun (YYYY-MM-DD, inclusive).'
+    )
+    parser.add_argument(
+        'end_date',
+        help='End of the date range to be rerun (YYYY-MM-DD, exclusive).'
+    )
+    parser.add_argument(
+        '--config-path',
+        help='Yaml configuration file. Default: <query_folder>/config.yaml.'
+    )
+    parser.add_argument(
+        '-r',
+        '--report',
+        action='append',
+        help=(
+            'Report to be re-run. Several reports can be specified like this. '
+            'If none is specified, all reports listed in the config file are '
+            'marked for re-run.'
+        )
+    )
+    return vars(parser.parse_args())
+
+
+def critical(message):
+    print('ERROR: ' + message)
+    sys.exit(1)
+
+
+def parse_date(args, arg_name):
+    try:
+        return datetime.strptime(args[arg_name], DATE_FORMAT)
+    except ValueError:
+        critical('Invalid %s.' % arg_name)
+
+
+def format_date(d):
+    return unicode(d.strftime(DATE_FORMAT)) + u'\n'
+
+
+def format_report(r):
+    return unicode(r) + u'\n'
+
+
+def main():
+    args = parse_arguments()
+
+    # Check dates.
+    start_date = parse_date(args, 'start_date')
+    end_date = parse_date(args, 'end_date')
+    if start_date >= end_date:
+        critical('start_date is greater than or equal to end_date.')
+    today = datetime.today()
+    if end_date > today:
+        critical('end_date is greater than today.')
+
+    # Check query folder.
+    query_folder = args['query_folder']
+    if not os.path.isdir(query_folder):
+        critical('Invalid query_folder.')
+    
+    # Check config.
+    config_path = args['config_path']
+    if config_path is None:
+        config_path = os.path.join(query_folder, 'config.yaml')
+    try:
+        with io.open(config_path, encoding='utf-8') as config_file:
+            config = yaml.load(config_file)
+    except IOError:
+        critical('Cannot read the config file.')
+
+    # Check reports.
+    reports = args['report']
+    if 'reports' not in config:
+        critical('Cannot find report section in config file.')
+    reports_config = config['reports']
+    if type(reports_config) != dict:
+        critical('Invalid report section in config file.')
+    if reports is None:
+        reports = reports_config.keys()
+    for report in reports:
+        if report not in reports_config:
+            critical('Report %s is not listed in config file.' % report)
+        try:
+            first_date = datetime.combine(
+                reports_config[report]['starts'],
+                datetime.min.time()
+            )
+        except Exception:
+            critical('Cannot parse starts field from %s config.' % report)
+        if first_date >= end_date:
+            critical('Report %s starts after the specified date range.' % 
report)
+
+    # Create rerun file.
+    reruns_folder = os.path.join(query_folder, '.reruns')
+    if not os.path.exists(reruns_folder):
+        try:
+            os.makedirs(reruns_folder)
+        except IOError:
+            critical('Could not create reruns folder.')
+    rerun_path = os.path.join(reruns_folder, str(int(time.time() * 1000)))
+    try:
+        with io.open(rerun_path, 'w', encoding='utf-8') as rerun_file:
+            rerun_file.writelines(
+                [format_date(start_date), format_date(end_date)] +
+                map(format_report, reports)
+            )
+    except IOError:
+        critical('Could not write rerun file.')
+
+    print('Reports successfully marked to be re-run.')
+
+
+if __name__ == '__main__':
+    main()
diff --git a/test/fixtures/config/reportupdater_test6.yaml 
b/test/fixtures/config/reportupdater_test6.yaml
new file mode 100644
index 0000000..acab503
--- /dev/null
+++ b/test/fixtures/config/reportupdater_test6.yaml
@@ -0,0 +1,12 @@
+databases:
+    reportupdater_db:
+        host: "some.db.host"
+        port: 1234
+        creds_file: /creds/.my.cnf
+        db: some_db_name
+defaults:
+    db: reportupdater_db
+reports:
+    reportupdater_test6:
+        granularity: days
+        starts: 2016-01-01
diff --git a/test/fixtures/output/selector_test3.tsv 
b/test/fixtures/output/selector_test3.tsv
new file mode 100644
index 0000000..ab17637
--- /dev/null
+++ b/test/fixtures/output/selector_test3.tsv
@@ -0,0 +1,6 @@
+date   value
+2015-01-01     a
+2015-01-02     b
+2015-01-03     c
+2015-01-04     d
+2015-01-05     e
diff --git a/test/fixtures/queries/reportupdater_test6.sql 
b/test/fixtures/queries/reportupdater_test6.sql
new file mode 100644
index 0000000..cff9ff8
--- /dev/null
+++ b/test/fixtures/queries/reportupdater_test6.sql
@@ -0,0 +1,5 @@
+SELECT date, value
+FROM table
+WHERE date >= {from_timestamp}
+AND date < {to_timestamp}
+;
\ No newline at end of file
diff --git a/test/locally/README.md b/test/locally/README.md
index 4686043..8052849 100644
--- a/test/locally/README.md
+++ b/test/locally/README.md
@@ -1,15 +1,33 @@
-To test your reports locally against the analytics slave databases:
+You can test your reports locally against the analytics slave databases with
+the following procedure. Have in mind that you still need ssh access to the
+datastores, testing locally is just a convenience.
 
-1. Create a file named `my.cnf.research` in this folder. It should look like:
+1. Clone the reportupdater query repository that you want to test, like:
+limn-language-data, limn-multimedia-data, limn-mobile-data, etc. Look at the
+`config.yaml` file to get the database host the queries connect to. Possible
+values are: s1-analytics-slave.eqiad.wmnet, analytics-store.eqiad.wmnet, etc.
+
+2. Create an ssh tunnel to that host using the script `ssh` in this directory.
+Edit the script if necessary to replace the host you want to connect to.
+
+3. Create a file named `my.cnf.research` in this folder. It should look like
+this (replacing <password> with the database password of the research user):
 ```
 [client]
 user=research
 password=<password>
 ```
 
-2. Temporarily modify the database section in your `config.yaml` file to
-mimic the example shown in `config_example.yaml`.
+4. Modify the database section in the query repository `config.yaml` file
+to point to localhost:3307, and to point to the .cnf file you just created.
+You can copy the example shown in `config_example.yaml`.
 
-3. Create an ssh tunnel using the script `ssh`.
+6. Run reportupdater from your machine (using update_reports.py), and point to
+the query folder you cloned, for example:
 
-4. Run reportupdater normally.
+    python ./update_reports.py /path/to/limn-mobile-data/mobile/ /tmp/output 
-l info
+
+This command will execute queries in the limn-mobile-data repo sending results
+to /tmp/output. The flag `-l info` prints more helpful logs. Some queries take
+a long time to execute, so consider stopping execution if you have collected
+enough output for the sake of your test.
diff --git a/test/locally/config_example.yaml b/test/locally/config_example.yaml
index 4eeacc0..163a771 100644
--- a/test/locally/config_example.yaml
+++ b/test/locally/config_example.yaml
@@ -4,10 +4,5 @@
         port: 3307
         creds_file: test/locally/my.cnf.research
         db: log
-    commons:
-        host: "127.0.0.1"
-        port: 3308
-        creds_file: test/locally/my.cnf.research
-        db: commonswiki
 defaults:
     db: el
diff --git a/test/locally/ssh b/test/locally/ssh
index 9ff9e82..ed10776 100755
--- a/test/locally/ssh
+++ b/test/locally/ssh
@@ -1,3 +1,5 @@
 #!/usr/bin/env bash
-# create SSH tunnel to EventLogging databases through stat1003
-ssh -L 3307:s1-analytics-slave.eqiad.wmnet:3306 -L 
3308:s4-analytics-slave.eqiad.wmnet:3306 stat1003.eqiad.wmnet
+
+# Create SSH tunnel to EventLogging databases through stat1003.
+# Replace s1-analytics-slave.eqiad.wmnet with any other host if you need.
+ssh -L 3307:s1-analytics-slave.eqiad.wmnet:3306 stat1003.eqiad.wmnet
diff --git a/test/reportupdater_test.py b/test/reportupdater_test.py
index 06e6671..580d7d4 100644
--- a/test/reportupdater_test.py
+++ b/test/reportupdater_test.py
@@ -274,3 +274,73 @@
             self.assertEqual(date_str, expected_date_str)
             self.assertEqual(type(value), unicode)
             expected_date += relativedelta(days=+1)
+
+
+    def test_daily_report_with_previous_results_and_reruns(self):
+        def fetchall_callback():
+            # This method will return a subsequent row with each call.
+            try:
+                sql_date = self.last_date + relativedelta(days=+1)
+                value = self.last_value + 1
+            except AttributeError:
+                # Starts at Mar, Jan and Feb are in previous results
+                sql_date = datetime(2016, 1, 1)
+                value = 1
+            self.last_date = sql_date
+            self.last_value = value
+            return [[sql_date, str(value)]]
+        header = ['date', 'value']
+        connection_mock = ConnectionMock(None, fetchall_callback, header)
+        pymysql.connect = MagicMock(return_value=connection_mock)
+
+        config_path = os.path.join(self.config_folder, 
'reportupdater_test6.yaml')
+        output_path = os.path.join(self.output_folder, 
'reportupdater_test6.tsv')
+        with io.open(output_path, 'w') as output_file:
+            output_file.write(unicode(
+                'date\tvalue\n'
+                '2016-01-01\t1\n'
+                '2016-01-02\ta\n'  # Note irregular result.
+                '2016-01-03\t3\n'
+                '2016-01-04\tb\n'  # Note irregular result.
+                '2016-01-05\t5\n'
+            ))
+        self.paths_to_clean.extend([output_path])
+
+        # Build rerun files.
+        rerun_folder = os.path.join(self.query_folder, '.reruns')
+        os.makedirs(rerun_folder)
+        rerun_path1 = os.path.join(rerun_folder, 'reportupdater_test6.1')
+        with io.open(rerun_path1, 'w') as rerun_file1:
+            rerun_file1.write(unicode(
+                '2016-01-02\n'
+                '2016-01-03\n'
+                'reportupdater_test6\n'
+            ))
+        rerun_path2 = os.path.join(rerun_folder, 'reportupdater_test6.2')
+        with io.open(rerun_path2, 'w') as rerun_file2:
+            rerun_file2.write(unicode(
+                '2016-01-04\n'
+                '2016-01-05\n'
+                'reportupdater_test6\n'
+            ))
+        self.paths_to_clean.extend([rerun_folder])
+
+        reportupdater.run(
+            config_path=config_path,
+            query_folder=self.query_folder,
+            output_folder=self.output_folder
+        )
+        self.assertTrue(os.path.exists(output_path))
+        with io.open(output_path, 'r', encoding='utf-8') as output_file:
+            output_lines = output_file.readlines()
+        self.assertTrue(len(output_lines) > 1)
+        header = output_lines.pop(0).strip()
+        self.assertEqual(header, 'date\tvalue')
+        # Assert that all lines hold subsequent values.
+        expected_date = datetime(2016, 1, 1)
+        expected_value = 1
+        for line in output_lines:
+            expected_line = expected_date.strftime(DATE_FORMAT) + '\t' + 
str(expected_value)
+            self.assertEqual(line.strip(), expected_line)
+            expected_date += relativedelta(days=+1)
+            expected_value += 1
diff --git a/test/selector_test.py b/test/selector_test.py
index 031008d..583a3a9 100644
--- a/test/selector_test.py
+++ b/test/selector_test.py
@@ -17,7 +17,8 @@
     def setUp(self):
         self.config = {
             'output_folder': 'test/fixtures/output',
-            'current_exec_time': datetime(2015, 1, 3, 1, 20, 30)
+            'current_exec_time': datetime(2015, 1, 3, 1, 20, 30),
+            'reruns': {}
         }
         reader = Reader(self.config)
         self.selector = Selector(reader, self.config)
@@ -67,6 +68,24 @@
         self.assertEqual(reports[0].end, datetime(2015, 1, 2))
 
 
+    def test_get_interval_reports_when_there_are_reruns(self):
+        self.report.key = 'selector_test3'
+        # see: test/fixtures/output/selector_test3.tsv
+        self.config['reruns'] = {
+            'selector_test3': [
+                (datetime(2015, 1, 2), datetime(2015, 1, 3)),
+                (datetime(2015, 1, 4), datetime(2015, 1, 5))
+            ]
+        }
+        now = datetime(2015, 1, 6)
+        reports = list(self.selector.get_interval_reports(self.report, now))
+        self.assertEqual(len(reports), 2)
+        self.assertEqual(reports[0].start, datetime(2015, 1, 2))
+        self.assertEqual(reports[0].end, datetime(2015, 1, 3))
+        self.assertEqual(reports[1].start, datetime(2015, 1, 4))
+        self.assertEqual(reports[1].end, datetime(2015, 1, 5))
+
+
     def test_truncate_date_when_period_is_hours(self):
         date = datetime(2015, 1, 5, 10, 20, 30)
         result = self.selector.truncate_date(date, 'hours')
diff --git a/test/writer_test.py b/test/writer_test.py
index d876a91..c5a5dd2 100644
--- a/test/writer_test.py
+++ b/test/writer_test.py
@@ -18,7 +18,8 @@
 
     def setUp(self):
         self.config = {
-            'output_folder': 'test/fixtures/output'
+            'output_folder': 'test/fixtures/output',
+            'reruns': {}
         }
         reader = Reader(self.config)
         selector = Selector(reader, self.config)
@@ -180,7 +181,6 @@
     def test_update_results_when_header_has_new_columns(self):
         # see setUp for the fake data written to this report output
         self.report.key = 'writer_test_header_change'
-
         new_header = ['date', 'val1', 'insert middle', 'val2', 'val3', 'insert 
after']
         old_date = datetime(2015, 1, 1)
         new_date = datetime(2015, 1, 2)
@@ -198,7 +198,6 @@
     def test_update_results_when_header_has_moved_columns(self):
         # see setUp for the fake data written to this report output
         self.report.key = 'writer_test_header_change'
-
         new_header = ['date', 'val2', 'val1', 'val3']
         old_date = datetime(2015, 1, 1)
         new_date = datetime(2015, 1, 2)
@@ -216,7 +215,6 @@
     def test_update_results_when_header_has_removed_columns(self):
         # see setUp for the fake data written to this report output
         self.report.key = 'writer_test_header_change'
-
         new_header = ['date', 'val1', 'val3']
         new_date = datetime(2015, 1, 2)
         new_row = [datetime(2015, 1, 2), 1, 3]
@@ -232,7 +230,6 @@
     def test_update_results_when_header_has_different_number_of_columns(self):
         # see setUp for the fake data written to this report output
         self.report.key = 'writer_test_header_change'
-
         new_header = ['date', 'val1', 'val2', 'val3']
         new_date = datetime(2015, 1, 2)
         new_row = [datetime(2015, 1, 2), 1, 2, 3, 'Additional']
@@ -247,7 +244,6 @@
     def test_update_results_when_header_has_new_and_moved_columns(self):
         # see setUp for the fake data written to this report output
         self.report.key = 'writer_test_header_change'
-
         new_header = ['date', 'val2', 'insert middle', 'val1', 'val3', 'insert 
after']
         old_date = datetime(2015, 1, 1)
         new_date = datetime(2015, 1, 2)
@@ -265,7 +261,6 @@
     def test_update_results_when_max_data_points_is_set(self):
         # see setUp for the fake data written to this report output
         self.report.key = 'writer_test_header_change'
-
         new_date = datetime(2015, 1, 2)
         new_row = [new_date, 1, 2, 3]
         self.report.max_data_points = 1

-- 
To view, visit https://gerrit.wikimedia.org/r/308977
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I5c0ce2ea3bbcc9ca86cfa89128e151ba084d9185
Gerrit-PatchSet: 5
Gerrit-Project: analytics/reportupdater
Gerrit-Branch: master
Gerrit-Owner: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to