Mforns has submitted this change and it was merged.
Change subject: Add re-run script
......................................................................
Add re-run script
This script allows marking one or more reports to be re-run for a
given date range. The re-run will take place the next time report-
updater runs.
To exectue unit tests and flake8: tox
To test it manually, for example:
1) Follow the test setup in test/locally/README.md
2) Run update_reports (kill it after getting some results).
3) Open a generated report and change the value for a date.
4) Run rerun_reports and pass the report and date you changed.
5) Run update_reports (kill it after getting some results).
6) Check that the report you changed has been corrected.
Bug: T117538
Change-Id: I5c0ce2ea3bbcc9ca86cfa89128e151ba084d9185
---
M reportupdater/reportupdater.py
M reportupdater/selector.py
M reportupdater/utils.py
M reportupdater/writer.py
A rerun_reports.py
A test/fixtures/config/reportupdater_test6.yaml
A test/fixtures/output/selector_test3.tsv
A test/fixtures/queries/reportupdater_test6.sql
M test/locally/README.md
M test/locally/config_example.yaml
M test/locally/ssh
M test/reportupdater_test.py
M test/selector_test.py
M test/writer_test.py
14 files changed, 346 insertions(+), 26 deletions(-)
Approvals:
Mforns: Verified; Looks good to me, approved
diff --git a/reportupdater/reportupdater.py b/reportupdater/reportupdater.py
index e11863c..d03ab94 100644
--- a/reportupdater/reportupdater.py
+++ b/reportupdater/reportupdater.py
@@ -25,7 +25,7 @@
from selector import Selector
from executor import Executor
from writer import Writer
-from utils import DATE_AND_TIME_FORMAT
+from utils import DATE_AND_TIME_FORMAT, DATE_FORMAT
def run(**kwargs):
@@ -42,6 +42,7 @@
config['current_exec_time'] = current_exec_time
config['query_folder'] = params['query_folder']
config['output_folder'] = params['output_folder']
+ config['reruns'], rerun_files = read_reruns(params['query_folder'])
reader = Reader(config)
selector = Selector(reader, config)
@@ -49,6 +50,7 @@
writer = Writer(executor, config)
writer.run()
+ delete_reruns(rerun_files) # delete rerun files that have been
processed
delete_pid_file(params) # free lock for other instances to execute
logging.info('Execution complete.')
else:
@@ -144,5 +146,49 @@
raise IOError('Can not read the config file because of: (' + str(e) +
').')
+def read_reruns(query_folder):
+ reruns_folder = os.path.join(query_folder, '.reruns')
+ if os.path.isdir(reruns_folder):
+ try:
+ rerun_candidates = os.listdir(reruns_folder)
+ except IOError, e:
+ raise IOError('Can not read rerun folder because of: (' + str(e) +
').')
+ rerun_config, rerun_files = {}, []
+ for rerun_candidate in rerun_candidates:
+ rerun_path = os.path.join(reruns_folder, rerun_candidate)
+ try:
+ # Use r+ mode (read and write) to force an error
+ # if the file is still being written.
+ with io.open(rerun_path, 'r+', encoding='utf-8') as rerun_file:
+ reruns = rerun_file.readlines()
+ parse_reruns(reruns, rerun_config)
+ rerun_files.append(rerun_path)
+ except:
+ logging.warning(
+ 'Rerun file %s could not be parsed and will be ignored.' %
rerun_path
+ )
+ return (rerun_config, rerun_files)
+ else:
+ return ({}, [])
+
+
+def parse_reruns(lines, rerun_config):
+ values = [l.strip() for l in lines]
+ start_date = datetime.strptime(values[0], DATE_FORMAT)
+ end_date = datetime.strptime(values[1], DATE_FORMAT)
+ for report in values[2:]:
+ if report not in rerun_config:
+ rerun_config[report] = []
+ rerun_config[report].append((start_date, end_date))
+
+
+def delete_reruns(rerun_files):
+ for rerun_file in rerun_files:
+ try:
+ os.remove(rerun_file)
+ except IOError:
+ logging.warning('Rerun file %s could not be deleted.' % rerun_file)
+
+
def utcnow():
return datetime.utcnow()
diff --git a/reportupdater/selector.py b/reportupdater/selector.py
index fadabd9..d0ccbd1 100644
--- a/reportupdater/selector.py
+++ b/reportupdater/selector.py
@@ -63,7 +63,8 @@
if report.max_data_points:
jump_back = get_increment(report.granularity,
report.max_data_points - 1)
first_date = max(first_date, last_date - jump_back)
- previous_results = get_previous_results(report, output_folder)
+ previous_results = get_previous_results(
+ report, output_folder, self.config['reruns'])
already_done_dates = previous_results['data'].keys()
for start in self.get_all_start_dates(first_date, last_date,
granularity_increment):
diff --git a/reportupdater/utils.py b/reportupdater/utils.py
index d894717..46757b6 100644
--- a/reportupdater/utils.py
+++ b/reportupdater/utils.py
@@ -22,7 +22,7 @@
raise error_class(message)
-def get_previous_results(report, output_folder):
+def get_previous_results(report, output_folder, reruns):
# Reads a report file to get its results
# and returns them in the expected dict(date->row) format.
previous_results = {'header': [], 'data': {}}
@@ -54,6 +54,8 @@
date = datetime.strptime(row[0], DATE_FORMAT)
except ValueError:
raise ValueError('Output file date does not match date
format.')
+ if needs_rerun(date, reruns.get(report.key, None)):
+ continue # Do not list this date so that it is re-run.
row[0] = date
if report.is_funnel:
data[date].append(row)
@@ -64,6 +66,15 @@
return previous_results
+def needs_rerun(date, rerun_intervals):
+ if rerun_intervals is None:
+ return False
+ for start, end in rerun_intervals:
+ if date >= start and date < end:
+ return True
+ return False
+
+
def get_exploded_report_output_path(output_folder, explode_by, report_key):
output_folder = os.path.join(output_folder, report_key)
placeholders = sorted(explode_by.keys())
diff --git a/reportupdater/writer.py b/reportupdater/writer.py
index 2af0595..017515b 100644
--- a/reportupdater/writer.py
+++ b/reportupdater/writer.py
@@ -62,8 +62,8 @@
if len(row) != len(current_header):
raise ValueError('Results and header do not match.')
- # Get previous results.
- previous_results = get_previous_results(report,
self.get_output_folder())
+ # Get previous results (no need to pass the reruns, they will be
overwritten).
+ previous_results = get_previous_results(report,
self.get_output_folder(), {})
previous_header = previous_results['header']
previous_data = previous_results['data']
if not previous_header:
diff --git a/rerun_reports.py b/rerun_reports.py
new file mode 100755
index 0000000..210b680
--- /dev/null
+++ b/rerun_reports.py
@@ -0,0 +1,140 @@
+#!/usr/bin/python
+
+import os
+import io
+import sys
+import time
+import yaml
+import logging
+import argparse
+from reportupdater import reportupdater
+from datetime import datetime
+
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+def parse_arguments():
+ parser = argparse.ArgumentParser(
+ description=('Mark reports to be re-run for a given date range.')
+ )
+ parser.add_argument(
+ 'query_folder',
+ help='Folder with *.sql files and scripts.'
+ )
+ parser.add_argument(
+ 'start_date',
+ help='Start of the date range to be rerun (YYYY-MM-DD, inclusive).'
+ )
+ parser.add_argument(
+ 'end_date',
+ help='End of the date range to be rerun (YYYY-MM-DD, exclusive).'
+ )
+ parser.add_argument(
+ '--config-path',
+ help='Yaml configuration file. Default: <query_folder>/config.yaml.'
+ )
+ parser.add_argument(
+ '-r',
+ '--report',
+ action='append',
+ help=(
+ 'Report to be re-run. Several reports can be specified like this. '
+ 'If none is specified, all reports listed in the config file are '
+ 'marked for re-run.'
+ )
+ )
+ return vars(parser.parse_args())
+
+
+def critical(message):
+ print('ERROR: ' + message)
+ sys.exit(1)
+
+
+def parse_date(args, arg_name):
+ try:
+ return datetime.strptime(args[arg_name], DATE_FORMAT)
+ except ValueError:
+ critical('Invalid %s.' % arg_name)
+
+
+def format_date(d):
+ return unicode(d.strftime(DATE_FORMAT)) + u'\n'
+
+
+def format_report(r):
+ return unicode(r) + u'\n'
+
+
+def main():
+ args = parse_arguments()
+
+ # Check dates.
+ start_date = parse_date(args, 'start_date')
+ end_date = parse_date(args, 'end_date')
+ if start_date >= end_date:
+ critical('start_date is greater than or equal to end_date.')
+ today = datetime.today()
+ if end_date > today:
+ critical('end_date is greater than today.')
+
+ # Check query folder.
+ query_folder = args['query_folder']
+ if not os.path.isdir(query_folder):
+ critical('Invalid query_folder.')
+
+ # Check config.
+ config_path = args['config_path']
+ if config_path is None:
+ config_path = os.path.join(query_folder, 'config.yaml')
+ try:
+ with io.open(config_path, encoding='utf-8') as config_file:
+ config = yaml.load(config_file)
+ except IOError:
+ critical('Cannot read the config file.')
+
+ # Check reports.
+ reports = args['report']
+ if 'reports' not in config:
+ critical('Cannot find report section in config file.')
+ reports_config = config['reports']
+ if type(reports_config) != dict:
+ critical('Invalid report section in config file.')
+ if reports is None:
+ reports = reports_config.keys()
+ for report in reports:
+ if report not in reports_config:
+ critical('Report %s is not listed in config file.' % report)
+ try:
+ first_date = datetime.combine(
+ reports_config[report]['starts'],
+ datetime.min.time()
+ )
+ except Exception:
+ critical('Cannot parse starts field from %s config.' % report)
+ if first_date >= end_date:
+ critical('Report %s starts after the specified date range.' %
report)
+
+ # Create rerun file.
+ reruns_folder = os.path.join(query_folder, '.reruns')
+ if not os.path.exists(reruns_folder):
+ try:
+ os.makedirs(reruns_folder)
+ except IOError:
+ critical('Could not create reruns folder.')
+ rerun_path = os.path.join(reruns_folder, str(int(time.time() * 1000)))
+ try:
+ with io.open(rerun_path, 'w', encoding='utf-8') as rerun_file:
+ rerun_file.writelines(
+ [format_date(start_date), format_date(end_date)] +
+ map(format_report, reports)
+ )
+ except IOError:
+ critical('Could not write rerun file.')
+
+ print('Reports successfully marked to be re-run.')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/fixtures/config/reportupdater_test6.yaml
b/test/fixtures/config/reportupdater_test6.yaml
new file mode 100644
index 0000000..acab503
--- /dev/null
+++ b/test/fixtures/config/reportupdater_test6.yaml
@@ -0,0 +1,12 @@
+databases:
+ reportupdater_db:
+ host: "some.db.host"
+ port: 1234
+ creds_file: /creds/.my.cnf
+ db: some_db_name
+defaults:
+ db: reportupdater_db
+reports:
+ reportupdater_test6:
+ granularity: days
+ starts: 2016-01-01
diff --git a/test/fixtures/output/selector_test3.tsv
b/test/fixtures/output/selector_test3.tsv
new file mode 100644
index 0000000..ab17637
--- /dev/null
+++ b/test/fixtures/output/selector_test3.tsv
@@ -0,0 +1,6 @@
+date value
+2015-01-01 a
+2015-01-02 b
+2015-01-03 c
+2015-01-04 d
+2015-01-05 e
diff --git a/test/fixtures/queries/reportupdater_test6.sql
b/test/fixtures/queries/reportupdater_test6.sql
new file mode 100644
index 0000000..cff9ff8
--- /dev/null
+++ b/test/fixtures/queries/reportupdater_test6.sql
@@ -0,0 +1,5 @@
+SELECT date, value
+FROM table
+WHERE date >= {from_timestamp}
+AND date < {to_timestamp}
+;
\ No newline at end of file
diff --git a/test/locally/README.md b/test/locally/README.md
index 4686043..8052849 100644
--- a/test/locally/README.md
+++ b/test/locally/README.md
@@ -1,15 +1,33 @@
-To test your reports locally against the analytics slave databases:
+You can test your reports locally against the analytics slave databases with
+the following procedure. Have in mind that you still need ssh access to the
+datastores, testing locally is just a convenience.
-1. Create a file named `my.cnf.research` in this folder. It should look like:
+1. Clone the reportupdater query repository that you want to test, like:
+limn-language-data, limn-multimedia-data, limn-mobile-data, etc. Look at the
+`config.yaml` file to get the database host the queries connect to. Possible
+values are: s1-analytics-slave.eqiad.wmnet, analytics-store.eqiad.wmnet, etc.
+
+2. Create an ssh tunnel to that host using the script `ssh` in this directory.
+Edit the script if necessary to replace the host you want to connect to.
+
+3. Create a file named `my.cnf.research` in this folder. It should look like
+this (replacing <password> with the database password of the research user):
```
[client]
user=research
password=<password>
```
-2. Temporarily modify the database section in your `config.yaml` file to
-mimic the example shown in `config_example.yaml`.
+4. Modify the database section in the query repository `config.yaml` file
+to point to localhost:3307, and to point to the .cnf file you just created.
+You can copy the example shown in `config_example.yaml`.
-3. Create an ssh tunnel using the script `ssh`.
+6. Run reportupdater from your machine (using update_reports.py), and point to
+the query folder you cloned, for example:
-4. Run reportupdater normally.
+ python ./update_reports.py /path/to/limn-mobile-data/mobile/ /tmp/output
-l info
+
+This command will execute queries in the limn-mobile-data repo sending results
+to /tmp/output. The flag `-l info` prints more helpful logs. Some queries take
+a long time to execute, so consider stopping execution if you have collected
+enough output for the sake of your test.
diff --git a/test/locally/config_example.yaml b/test/locally/config_example.yaml
index 4eeacc0..163a771 100644
--- a/test/locally/config_example.yaml
+++ b/test/locally/config_example.yaml
@@ -4,10 +4,5 @@
port: 3307
creds_file: test/locally/my.cnf.research
db: log
- commons:
- host: "127.0.0.1"
- port: 3308
- creds_file: test/locally/my.cnf.research
- db: commonswiki
defaults:
db: el
diff --git a/test/locally/ssh b/test/locally/ssh
index 9ff9e82..ed10776 100755
--- a/test/locally/ssh
+++ b/test/locally/ssh
@@ -1,3 +1,5 @@
#!/usr/bin/env bash
-# create SSH tunnel to EventLogging databases through stat1003
-ssh -L 3307:s1-analytics-slave.eqiad.wmnet:3306 -L
3308:s4-analytics-slave.eqiad.wmnet:3306 stat1003.eqiad.wmnet
+
+# Create SSH tunnel to EventLogging databases through stat1003.
+# Replace s1-analytics-slave.eqiad.wmnet with any other host if you need.
+ssh -L 3307:s1-analytics-slave.eqiad.wmnet:3306 stat1003.eqiad.wmnet
diff --git a/test/reportupdater_test.py b/test/reportupdater_test.py
index 06e6671..580d7d4 100644
--- a/test/reportupdater_test.py
+++ b/test/reportupdater_test.py
@@ -274,3 +274,73 @@
self.assertEqual(date_str, expected_date_str)
self.assertEqual(type(value), unicode)
expected_date += relativedelta(days=+1)
+
+
+ def test_daily_report_with_previous_results_and_reruns(self):
+ def fetchall_callback():
+ # This method will return a subsequent row with each call.
+ try:
+ sql_date = self.last_date + relativedelta(days=+1)
+ value = self.last_value + 1
+ except AttributeError:
+ # Starts at Mar, Jan and Feb are in previous results
+ sql_date = datetime(2016, 1, 1)
+ value = 1
+ self.last_date = sql_date
+ self.last_value = value
+ return [[sql_date, str(value)]]
+ header = ['date', 'value']
+ connection_mock = ConnectionMock(None, fetchall_callback, header)
+ pymysql.connect = MagicMock(return_value=connection_mock)
+
+ config_path = os.path.join(self.config_folder,
'reportupdater_test6.yaml')
+ output_path = os.path.join(self.output_folder,
'reportupdater_test6.tsv')
+ with io.open(output_path, 'w') as output_file:
+ output_file.write(unicode(
+ 'date\tvalue\n'
+ '2016-01-01\t1\n'
+ '2016-01-02\ta\n' # Note irregular result.
+ '2016-01-03\t3\n'
+ '2016-01-04\tb\n' # Note irregular result.
+ '2016-01-05\t5\n'
+ ))
+ self.paths_to_clean.extend([output_path])
+
+ # Build rerun files.
+ rerun_folder = os.path.join(self.query_folder, '.reruns')
+ os.makedirs(rerun_folder)
+ rerun_path1 = os.path.join(rerun_folder, 'reportupdater_test6.1')
+ with io.open(rerun_path1, 'w') as rerun_file1:
+ rerun_file1.write(unicode(
+ '2016-01-02\n'
+ '2016-01-03\n'
+ 'reportupdater_test6\n'
+ ))
+ rerun_path2 = os.path.join(rerun_folder, 'reportupdater_test6.2')
+ with io.open(rerun_path2, 'w') as rerun_file2:
+ rerun_file2.write(unicode(
+ '2016-01-04\n'
+ '2016-01-05\n'
+ 'reportupdater_test6\n'
+ ))
+ self.paths_to_clean.extend([rerun_folder])
+
+ reportupdater.run(
+ config_path=config_path,
+ query_folder=self.query_folder,
+ output_folder=self.output_folder
+ )
+ self.assertTrue(os.path.exists(output_path))
+ with io.open(output_path, 'r', encoding='utf-8') as output_file:
+ output_lines = output_file.readlines()
+ self.assertTrue(len(output_lines) > 1)
+ header = output_lines.pop(0).strip()
+ self.assertEqual(header, 'date\tvalue')
+ # Assert that all lines hold subsequent values.
+ expected_date = datetime(2016, 1, 1)
+ expected_value = 1
+ for line in output_lines:
+ expected_line = expected_date.strftime(DATE_FORMAT) + '\t' +
str(expected_value)
+ self.assertEqual(line.strip(), expected_line)
+ expected_date += relativedelta(days=+1)
+ expected_value += 1
diff --git a/test/selector_test.py b/test/selector_test.py
index 031008d..583a3a9 100644
--- a/test/selector_test.py
+++ b/test/selector_test.py
@@ -17,7 +17,8 @@
def setUp(self):
self.config = {
'output_folder': 'test/fixtures/output',
- 'current_exec_time': datetime(2015, 1, 3, 1, 20, 30)
+ 'current_exec_time': datetime(2015, 1, 3, 1, 20, 30),
+ 'reruns': {}
}
reader = Reader(self.config)
self.selector = Selector(reader, self.config)
@@ -67,6 +68,24 @@
self.assertEqual(reports[0].end, datetime(2015, 1, 2))
+ def test_get_interval_reports_when_there_are_reruns(self):
+ self.report.key = 'selector_test3'
+ # see: test/fixtures/output/selector_test3.tsv
+ self.config['reruns'] = {
+ 'selector_test3': [
+ (datetime(2015, 1, 2), datetime(2015, 1, 3)),
+ (datetime(2015, 1, 4), datetime(2015, 1, 5))
+ ]
+ }
+ now = datetime(2015, 1, 6)
+ reports = list(self.selector.get_interval_reports(self.report, now))
+ self.assertEqual(len(reports), 2)
+ self.assertEqual(reports[0].start, datetime(2015, 1, 2))
+ self.assertEqual(reports[0].end, datetime(2015, 1, 3))
+ self.assertEqual(reports[1].start, datetime(2015, 1, 4))
+ self.assertEqual(reports[1].end, datetime(2015, 1, 5))
+
+
def test_truncate_date_when_period_is_hours(self):
date = datetime(2015, 1, 5, 10, 20, 30)
result = self.selector.truncate_date(date, 'hours')
diff --git a/test/writer_test.py b/test/writer_test.py
index d876a91..c5a5dd2 100644
--- a/test/writer_test.py
+++ b/test/writer_test.py
@@ -18,7 +18,8 @@
def setUp(self):
self.config = {
- 'output_folder': 'test/fixtures/output'
+ 'output_folder': 'test/fixtures/output',
+ 'reruns': {}
}
reader = Reader(self.config)
selector = Selector(reader, self.config)
@@ -180,7 +181,6 @@
def test_update_results_when_header_has_new_columns(self):
# see setUp for the fake data written to this report output
self.report.key = 'writer_test_header_change'
-
new_header = ['date', 'val1', 'insert middle', 'val2', 'val3', 'insert
after']
old_date = datetime(2015, 1, 1)
new_date = datetime(2015, 1, 2)
@@ -198,7 +198,6 @@
def test_update_results_when_header_has_moved_columns(self):
# see setUp for the fake data written to this report output
self.report.key = 'writer_test_header_change'
-
new_header = ['date', 'val2', 'val1', 'val3']
old_date = datetime(2015, 1, 1)
new_date = datetime(2015, 1, 2)
@@ -216,7 +215,6 @@
def test_update_results_when_header_has_removed_columns(self):
# see setUp for the fake data written to this report output
self.report.key = 'writer_test_header_change'
-
new_header = ['date', 'val1', 'val3']
new_date = datetime(2015, 1, 2)
new_row = [datetime(2015, 1, 2), 1, 3]
@@ -232,7 +230,6 @@
def test_update_results_when_header_has_different_number_of_columns(self):
# see setUp for the fake data written to this report output
self.report.key = 'writer_test_header_change'
-
new_header = ['date', 'val1', 'val2', 'val3']
new_date = datetime(2015, 1, 2)
new_row = [datetime(2015, 1, 2), 1, 2, 3, 'Additional']
@@ -247,7 +244,6 @@
def test_update_results_when_header_has_new_and_moved_columns(self):
# see setUp for the fake data written to this report output
self.report.key = 'writer_test_header_change'
-
new_header = ['date', 'val2', 'insert middle', 'val1', 'val3', 'insert
after']
old_date = datetime(2015, 1, 1)
new_date = datetime(2015, 1, 2)
@@ -265,7 +261,6 @@
def test_update_results_when_max_data_points_is_set(self):
# see setUp for the fake data written to this report output
self.report.key = 'writer_test_header_change'
-
new_date = datetime(2015, 1, 2)
new_row = [new_date, 1, 2, 3]
self.report.max_data_points = 1
--
To view, visit https://gerrit.wikimedia.org/r/308977
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I5c0ce2ea3bbcc9ca86cfa89128e151ba084d9185
Gerrit-PatchSet: 5
Gerrit-Project: analytics/reportupdater
Gerrit-Branch: master
Gerrit-Owner: Mforns <[email protected]>
Gerrit-Reviewer: Mforns <[email protected]>
Gerrit-Reviewer: Nuria <[email protected]>
Gerrit-Reviewer: jenkins-bot <>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits