Milimetric has uploaded a new change for review. https://gerrit.wikimedia.org/r/86068
Change subject: implements columnar timeseries csv ...................................................................... implements columnar timeseries csv Change-Id: I284361256634ca370af5b807746315746789278a --- M wikimetrics/controllers/reports.py 1 file changed, 174 insertions(+), 67 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/wikimetrics refs/changes/68/86068/1 diff --git a/wikimetrics/controllers/reports.py b/wikimetrics/controllers/reports.py index b9e8f58..63c1fc9 100644 --- a/wikimetrics/controllers/reports.py +++ b/wikimetrics/controllers/reports.py @@ -1,13 +1,15 @@ +import json +from csv import DictWriter +from StringIO import StringIO from sqlalchemy.orm.exc import NoResultFound from flask import render_template, request, url_for, Response from flask.ext.login import current_user -from ..configurables import app, db -from ..models import Report, RunReport, PersistentReport -from ..models.report_nodes import Aggregation -from ..utils import json_response, json_error, json_redirect, thirty_days_ago -import json -from StringIO import StringIO -from csv import DictWriter + +from wikimetrics.configurables import app, db +from wikimetrics.models import Report, RunReport, PersistentReport +from wikimetrics.metrics import TimeseriesChoices +from wikimetrics.models.report_nodes import Aggregation +from wikimetrics.utils import json_response, json_error, json_redirect, thirty_days_ago @app.route('/reports/') @@ -105,73 +107,178 @@ if celery_task.ready(): task_result = get_celery_task_result(celery_task, pj) - - csv_io = StringIO() - if task_result: - columns = [] - - if Aggregation.IND in task_result: - columns = task_result[Aggregation.IND][0].values()[0].keys() - elif Aggregation.SUM in task_result: - columns = task_result[Aggregation.SUM].keys() - elif Aggregation.AVG in task_result: - columns = task_result[Aggregation.AVG].keys() - elif Aggregation.STD in task_result: - columns = task_result[Aggregation.STD].keys() - - # if task_result is not empty find header in first row - fieldnames = ['user_id'] + columns - else: - fieldnames = ['user_id'] - writer = DictWriter(csv_io, fieldnames) - - # collect rows to output in CSV - task_rows = [] - - # Individual Results - if Aggregation.IND in task_result: - # fold user_id into dict so we can use DictWriter to escape things - for user_id, row in task_result[Aggregation.IND][0].iteritems(): - task_row = row.copy() - task_row['user_id'] = user_id - task_rows.append(task_row) - - # Aggregate Results - if Aggregation.SUM in task_result: - task_row = task_result[Aggregation.SUM].copy() - task_row['user_id'] = Aggregation.SUM - task_rows.append(task_row) - - if Aggregation.AVG in task_result: - task_row = task_result[Aggregation.AVG].copy() - task_row['user_id'] = Aggregation.AVG - task_rows.append(task_row) - - if Aggregation.STD in task_result: - task_row = task_result[Aggregation.STD].copy() - task_row['user_id'] = Aggregation.STD - task_rows.append(task_row) - parameters = json.loads(pj.parameters) - # generate some empty rows to separate the result - # from the parameters - task_rows.append({}) - task_rows.append({}) - task_rows.append({'user_id': 'parameters'}) + if 'timeseries' in parameters\ + and parameters['timeseries'] != TimeseriesChoices.NONE: + csv_io = get_timeseries_csv(task_result, pj, parameters) + else: + csv_io = get_simple_csv(task_result, pj, parameters) - for key, value in parameters.items(): - task_rows.append({'user_id': key , fieldnames[1]: value}) - - task_rows.append({'user_id': 'metric/cohort name', fieldnames[1]: pj.name}) - - writer.writeheader() - writer.writerows(task_rows) - return Response(csv_io.getvalue(), mimetype='text/csv') + res = Response(csv_io.getvalue(), mimetype='text/csv') + res.headers['Content-Disposition'] =\ + 'attachment; filename={0}.csv'.format(pj.name) + return res else: return json_response(status=celery_task.status) +def get_timeseries_csv(task_result, pj, parameters): + """ + Parameters + task_result : the result dictionary from Celery + pj : a pointer to the permanent job + parameters : a dictionary of pj.parameters + + Returns + A StringIO instance representing timeseries CSV + """ + csv_io = StringIO() + if task_result: + columns = [] + + if Aggregation.IND in task_result: + columns = task_result[Aggregation.IND][0].values()[0].values()[0].keys() + elif Aggregation.SUM in task_result: + columns = task_result[Aggregation.SUM].values()[0].keys() + elif Aggregation.AVG in task_result: + columns = task_result[Aggregation.AVG].values()[0].keys() + elif Aggregation.STD in task_result: + columns = task_result[Aggregation.STD].values()[0].keys() + + # if task_result is not empty find header in first row + fieldnames = ['user_id', 'submetric'] + sorted(columns) + else: + fieldnames = ['user_id', 'submetric'] + writer = DictWriter(csv_io, fieldnames) + + # collect rows to output in CSV + task_rows = [] + + # Individual Results + if Aggregation.IND in task_result: + # fold user_id into dict so we can use DictWriter to escape things + for user_id, row in task_result[Aggregation.IND][0].iteritems(): + for subrow in row.keys(): + task_row = row[subrow].copy() + task_row['user_id'] = user_id + task_row['submetric'] = subrow + task_rows.append(task_row) + + # Aggregate Results + if Aggregation.SUM in task_result: + row = task_result[Aggregation.SUM] + for subrow in row.keys(): + task_row = row[subrow].copy() + task_row['user_id'] = Aggregation.SUM + task_row['submetric'] = subrow + task_rows.append(task_row) + + if Aggregation.AVG in task_result: + row = task_result[Aggregation.AVG] + for subrow in row.keys(): + task_row = row[subrow].copy() + task_row['user_id'] = Aggregation.AVG + task_row['submetric'] = subrow + task_rows.append(task_row) + + if Aggregation.STD in task_result: + row = task_result[Aggregation.STD] + for subrow in row.keys(): + task_row = row[subrow].copy() + task_row['user_id'] = Aggregation.STD + task_row['submetric'] = subrow + task_rows.append(task_row) + + # generate some empty rows to separate the result + # from the parameters + task_rows.append({}) + task_rows.append({}) + task_rows.append({'user_id': 'parameters'}) + + for key, value in parameters.items(): + task_rows.append({'user_id': key , fieldnames[1]: value}) + + task_rows.append({'user_id': 'metric/cohort name', fieldnames[1]: pj.name}) + + writer.writeheader() + writer.writerows(task_rows) + return csv_io + + +def get_simple_csv(task_result, pj, parameters): + """ + Parameters + task_result : the result dictionary from Celery + pj : a pointer to the permanent job + parameters : a dictionary of pj.parameters + + Returns + A StringIO instance representing simple CSV + """ + + csv_io = StringIO() + if task_result: + columns = [] + + if Aggregation.IND in task_result: + columns = task_result[Aggregation.IND][0].values()[0].keys() + elif Aggregation.SUM in task_result: + columns = task_result[Aggregation.SUM].keys() + elif Aggregation.AVG in task_result: + columns = task_result[Aggregation.AVG].keys() + elif Aggregation.STD in task_result: + columns = task_result[Aggregation.STD].keys() + + # if task_result is not empty find header in first row + fieldnames = ['user_id'] + columns + else: + fieldnames = ['user_id'] + writer = DictWriter(csv_io, fieldnames) + + # collect rows to output in CSV + task_rows = [] + + # Individual Results + if Aggregation.IND in task_result: + # fold user_id into dict so we can use DictWriter to escape things + for user_id, row in task_result[Aggregation.IND][0].iteritems(): + task_row = row.copy() + task_row['user_id'] = user_id + task_rows.append(task_row) + + # Aggregate Results + if Aggregation.SUM in task_result: + task_row = task_result[Aggregation.SUM].copy() + task_row['user_id'] = Aggregation.SUM + task_rows.append(task_row) + + if Aggregation.AVG in task_result: + task_row = task_result[Aggregation.AVG].copy() + task_row['user_id'] = Aggregation.AVG + task_rows.append(task_row) + + if Aggregation.STD in task_result: + task_row = task_result[Aggregation.STD].copy() + task_row['user_id'] = Aggregation.STD + task_rows.append(task_row) + + # generate some empty rows to separate the result + # from the parameters + task_rows.append({}) + task_rows.append({}) + task_rows.append({'user_id': 'parameters'}) + + for key, value in parameters.items(): + task_rows.append({'user_id': key , fieldnames[1]: value}) + + task_rows.append({'user_id': 'metric/cohort name', fieldnames[1]: pj.name}) + + writer.writeheader() + writer.writerows(task_rows) + return csv_io + + @app.route('/reports/result/<result_key>.json') def report_result_json(result_key): celery_task, pj = get_celery_task(result_key) -- To view, visit https://gerrit.wikimedia.org/r/86068 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I284361256634ca370af5b807746315746789278a Gerrit-PatchSet: 1 Gerrit-Project: analytics/wikimetrics Gerrit-Branch: master Gerrit-Owner: Milimetric <dandree...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits