Milimetric has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/86068


Change subject: implements columnar timeseries csv
......................................................................

implements columnar timeseries csv

Change-Id: I284361256634ca370af5b807746315746789278a
---
M wikimetrics/controllers/reports.py
1 file changed, 174 insertions(+), 67 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/wikimetrics 
refs/changes/68/86068/1

diff --git a/wikimetrics/controllers/reports.py 
b/wikimetrics/controllers/reports.py
index b9e8f58..63c1fc9 100644
--- a/wikimetrics/controllers/reports.py
+++ b/wikimetrics/controllers/reports.py
@@ -1,13 +1,15 @@
+import json
+from csv import DictWriter
+from StringIO import StringIO
 from sqlalchemy.orm.exc import NoResultFound
 from flask import render_template, request, url_for, Response
 from flask.ext.login import current_user
-from ..configurables import app, db
-from ..models import Report, RunReport, PersistentReport
-from ..models.report_nodes import Aggregation
-from ..utils import json_response, json_error, json_redirect, thirty_days_ago
-import json
-from StringIO import StringIO
-from csv import DictWriter
+
+from wikimetrics.configurables import app, db
+from wikimetrics.models import Report, RunReport, PersistentReport
+from wikimetrics.metrics import TimeseriesChoices
+from wikimetrics.models.report_nodes import Aggregation
+from wikimetrics.utils import json_response, json_error, json_redirect, 
thirty_days_ago
 
 
 @app.route('/reports/')
@@ -105,73 +107,178 @@
     
     if celery_task.ready():
         task_result = get_celery_task_result(celery_task, pj)
-        
-        csv_io = StringIO()
-        if task_result:
-            columns = []
-            
-            if Aggregation.IND in task_result:
-                columns = task_result[Aggregation.IND][0].values()[0].keys()
-            elif Aggregation.SUM in task_result:
-                columns = task_result[Aggregation.SUM].keys()
-            elif Aggregation.AVG in task_result:
-                columns = task_result[Aggregation.AVG].keys()
-            elif Aggregation.STD in task_result:
-                columns = task_result[Aggregation.STD].keys()
-            
-            # if task_result is not empty find header in first row
-            fieldnames = ['user_id'] + columns
-        else:
-            fieldnames = ['user_id']
-        writer = DictWriter(csv_io, fieldnames)
-        
-        # collect rows to output in CSV
-        task_rows = []
-        
-        # Individual Results
-        if Aggregation.IND in task_result:
-            # fold user_id into dict so we can use DictWriter to escape things
-            for user_id, row in task_result[Aggregation.IND][0].iteritems():
-                task_row = row.copy()
-                task_row['user_id'] = user_id
-                task_rows.append(task_row)
-        
-        # Aggregate Results
-        if Aggregation.SUM in task_result:
-            task_row = task_result[Aggregation.SUM].copy()
-            task_row['user_id'] = Aggregation.SUM
-            task_rows.append(task_row)
-        
-        if Aggregation.AVG in task_result:
-            task_row = task_result[Aggregation.AVG].copy()
-            task_row['user_id'] = Aggregation.AVG
-            task_rows.append(task_row)
-        
-        if Aggregation.STD in task_result:
-            task_row = task_result[Aggregation.STD].copy()
-            task_row['user_id'] = Aggregation.STD
-            task_rows.append(task_row)
-        
         parameters = json.loads(pj.parameters)
         
-        # generate some empty rows to separate the result
-        # from the parameters
-        task_rows.append({})
-        task_rows.append({})
-        task_rows.append({'user_id': 'parameters'})
+        if 'timeseries' in parameters\
+            and parameters['timeseries'] != TimeseriesChoices.NONE:
+            csv_io = get_timeseries_csv(task_result, pj, parameters)
+        else:
+            csv_io = get_simple_csv(task_result, pj, parameters)
         
-        for key, value in parameters.items():
-            task_rows.append({'user_id': key , fieldnames[1]: value})
-        
-        task_rows.append({'user_id': 'metric/cohort name', fieldnames[1]: 
pj.name})
-        
-        writer.writeheader()
-        writer.writerows(task_rows)
-        return Response(csv_io.getvalue(), mimetype='text/csv')
+        res = Response(csv_io.getvalue(), mimetype='text/csv')
+        res.headers['Content-Disposition'] =\
+            'attachment; filename={0}.csv'.format(pj.name)
+        return res
     else:
         return json_response(status=celery_task.status)
 
 
+def get_timeseries_csv(task_result, pj, parameters):
+    """
+    Parameters
+        task_result : the result dictionary from Celery
+        pj          : a pointer to the permanent job
+        parameters  : a dictionary of pj.parameters
+    
+    Returns
+        A StringIO instance representing timeseries CSV
+    """
+    csv_io = StringIO()
+    if task_result:
+        columns = []
+        
+        if Aggregation.IND in task_result:
+            columns = 
task_result[Aggregation.IND][0].values()[0].values()[0].keys()
+        elif Aggregation.SUM in task_result:
+            columns = task_result[Aggregation.SUM].values()[0].keys()
+        elif Aggregation.AVG in task_result:
+            columns = task_result[Aggregation.AVG].values()[0].keys()
+        elif Aggregation.STD in task_result:
+            columns = task_result[Aggregation.STD].values()[0].keys()
+        
+        # if task_result is not empty find header in first row
+        fieldnames = ['user_id', 'submetric'] + sorted(columns)
+    else:
+        fieldnames = ['user_id', 'submetric']
+    writer = DictWriter(csv_io, fieldnames)
+    
+    # collect rows to output in CSV
+    task_rows = []
+    
+    # Individual Results
+    if Aggregation.IND in task_result:
+        # fold user_id into dict so we can use DictWriter to escape things
+        for user_id, row in task_result[Aggregation.IND][0].iteritems():
+            for subrow in row.keys():
+                task_row = row[subrow].copy()
+                task_row['user_id'] = user_id
+                task_row['submetric'] = subrow
+                task_rows.append(task_row)
+    
+    # Aggregate Results
+    if Aggregation.SUM in task_result:
+        row = task_result[Aggregation.SUM]
+        for subrow in row.keys():
+            task_row = row[subrow].copy()
+            task_row['user_id'] = Aggregation.SUM
+            task_row['submetric'] = subrow
+            task_rows.append(task_row)
+    
+    if Aggregation.AVG in task_result:
+        row = task_result[Aggregation.AVG]
+        for subrow in row.keys():
+            task_row = row[subrow].copy()
+            task_row['user_id'] = Aggregation.AVG
+            task_row['submetric'] = subrow
+            task_rows.append(task_row)
+    
+    if Aggregation.STD in task_result:
+        row = task_result[Aggregation.STD]
+        for subrow in row.keys():
+            task_row = row[subrow].copy()
+            task_row['user_id'] = Aggregation.STD
+            task_row['submetric'] = subrow
+            task_rows.append(task_row)
+    
+    # generate some empty rows to separate the result
+    # from the parameters
+    task_rows.append({})
+    task_rows.append({})
+    task_rows.append({'user_id': 'parameters'})
+    
+    for key, value in parameters.items():
+        task_rows.append({'user_id': key , fieldnames[1]: value})
+    
+    task_rows.append({'user_id': 'metric/cohort name', fieldnames[1]: pj.name})
+    
+    writer.writeheader()
+    writer.writerows(task_rows)
+    return csv_io
+
+
+def get_simple_csv(task_result, pj, parameters):
+    """
+    Parameters
+        task_result : the result dictionary from Celery
+        pj          : a pointer to the permanent job
+        parameters  : a dictionary of pj.parameters
+    
+    Returns
+        A StringIO instance representing simple CSV
+    """
+    
+    csv_io = StringIO()
+    if task_result:
+        columns = []
+        
+        if Aggregation.IND in task_result:
+            columns = task_result[Aggregation.IND][0].values()[0].keys()
+        elif Aggregation.SUM in task_result:
+            columns = task_result[Aggregation.SUM].keys()
+        elif Aggregation.AVG in task_result:
+            columns = task_result[Aggregation.AVG].keys()
+        elif Aggregation.STD in task_result:
+            columns = task_result[Aggregation.STD].keys()
+        
+        # if task_result is not empty find header in first row
+        fieldnames = ['user_id'] + columns
+    else:
+        fieldnames = ['user_id']
+    writer = DictWriter(csv_io, fieldnames)
+    
+    # collect rows to output in CSV
+    task_rows = []
+    
+    # Individual Results
+    if Aggregation.IND in task_result:
+        # fold user_id into dict so we can use DictWriter to escape things
+        for user_id, row in task_result[Aggregation.IND][0].iteritems():
+            task_row = row.copy()
+            task_row['user_id'] = user_id
+            task_rows.append(task_row)
+    
+    # Aggregate Results
+    if Aggregation.SUM in task_result:
+        task_row = task_result[Aggregation.SUM].copy()
+        task_row['user_id'] = Aggregation.SUM
+        task_rows.append(task_row)
+    
+    if Aggregation.AVG in task_result:
+        task_row = task_result[Aggregation.AVG].copy()
+        task_row['user_id'] = Aggregation.AVG
+        task_rows.append(task_row)
+    
+    if Aggregation.STD in task_result:
+        task_row = task_result[Aggregation.STD].copy()
+        task_row['user_id'] = Aggregation.STD
+        task_rows.append(task_row)
+    
+    # generate some empty rows to separate the result
+    # from the parameters
+    task_rows.append({})
+    task_rows.append({})
+    task_rows.append({'user_id': 'parameters'})
+    
+    for key, value in parameters.items():
+        task_rows.append({'user_id': key , fieldnames[1]: value})
+    
+    task_rows.append({'user_id': 'metric/cohort name', fieldnames[1]: pj.name})
+    
+    writer.writeheader()
+    writer.writerows(task_rows)
+    return csv_io
+
+
 @app.route('/reports/result/<result_key>.json')
 def report_result_json(result_key):
     celery_task, pj = get_celery_task(result_key)

-- 
To view, visit https://gerrit.wikimedia.org/r/86068
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I284361256634ca370af5b807746315746789278a
Gerrit-PatchSet: 1
Gerrit-Project: analytics/wikimetrics
Gerrit-Branch: master
Gerrit-Owner: Milimetric <dandree...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to