Rfaulk has submitted this change and it was merged.

Change subject: add. form request_obj from raw request.
......................................................................


add. form request_obj from raw request.

add. method to construct request object with validation + filtering.

mod. get_data depends on raw request.

mod. simplify output view - now it just handles responding to the request which 
may include adding a request to the broker request target.

add. new error code for fetching queued requests.

mod. in output handle case where job is already queued.

add. a new broker target for jobs processing or running.

add. read/write to process broker target.

fix. call to add method in broker.

fix. cyclical dependency between request meta and data.

Change-Id: I51de1bba55f9b604cbf8ab03948816935841d8af

fix. Changed AnonymousUser to AnonymousUserMixin for flask-login 0.9 
compatibility.

Change-Id: I89e8ac360192ecbbe349df4d9370ce751b96c24d
---
M user_metrics/api/__init__.py
M user_metrics/api/engine/__init__.py
M user_metrics/api/engine/data.py
M user_metrics/api/engine/request_manager.py
M user_metrics/api/engine/request_meta.py
M user_metrics/api/session.py
M user_metrics/api/views.py
7 files changed, 143 insertions(+), 129 deletions(-)

Approvals:
  Rfaulk: Verified; Looks good to me, approved



diff --git a/user_metrics/api/__init__.py b/user_metrics/api/__init__.py
index 5d41e16..91020a8 100644
--- a/user_metrics/api/__init__.py
+++ b/user_metrics/api/__init__.py
@@ -12,6 +12,7 @@
 
 REQUEST_BROKER_TARGET = 'request_broker.txt'
 RESPONSE_BROKER_TARGET = 'response_broker.txt'
+PROCESS_BROKER_TARGET = 'process_broker.txt'
 
 umapi_broker_context = FileBroker()
 
@@ -32,6 +33,7 @@
     3: 'Could not find User ID.',
     4: 'Bad metric name.',
     5: 'Failed to retrieve users.',
+    6: 'Job is currently queued.',
 }
 
 
diff --git a/user_metrics/api/engine/__init__.py 
b/user_metrics/api/engine/__init__.py
index 3f4d2d6..0fc7e57 100644
--- a/user_metrics/api/engine/__init__.py
+++ b/user_metrics/api/engine/__init__.py
@@ -35,7 +35,11 @@
 __license__ = "GPL (version 2 or later)"
 
 from re import search
+from user_metrics.config import settings, logging
 from user_metrics.api import MetricsAPIError, query_mod
+from datetime import datetime
+import user_metrics.etl.data_loader as dl
+
 
 #
 # Define remaining constants
@@ -119,3 +123,30 @@
         for key in user_ids:
             if user_ids[key] > 1:
                 yield key
+
+
+def get_cohort_refresh_datetime(utm_id):
+    """
+        Get the latest refresh datetime of a cohort.  Returns current time
+        formatted as a string if the field is not found.
+    """
+
+    # @TODO MOVE DB REFS INTO QUERY MODULE
+    conn = dl.Connector(instance=settings.__cohort_data_instance__)
+    query = """ SELECT utm_touched FROM usertags_meta WHERE utm_id = %s """
+    conn._cur_.execute(query, int(utm_id))
+
+    utm_touched = None
+    try:
+        utm_touched = conn._cur_.fetchone()[0]
+    except ValueError:
+        pass
+
+    # Ensure the field was retrieved
+    if not utm_touched:
+        logging.error(__name__ + '::Missing utm_touched for cohort %s.' %
+                                 str(utm_id))
+        utm_touched = datetime.now()
+
+    del conn
+    return utm_touched.strftime(DATETIME_STR_FORMAT)
diff --git a/user_metrics/api/engine/data.py b/user_metrics/api/engine/data.py
index 6b452b1..6165ef9 100644
--- a/user_metrics/api/engine/data.py
+++ b/user_metrics/api/engine/data.py
@@ -13,7 +13,6 @@
 
         get_users(cohort_expr)
         get_cohort_id(utm_name)
-        get_cohort_refresh_datetime(utm_id)
 
     The other portion of data storage and retrieval is concerned with providing
     functionality that enables responses to be cached.  Request responses are
@@ -48,21 +47,17 @@
 __license__ = "GPL (version 2 or later)"
 
 
-from datetime import datetime
 from re import search
 from collections import OrderedDict
 from hashlib import sha1
 import cPickle
 import os
 
-import user_metrics.etl.data_loader as dl
-from user_metrics.config import logging
-from user_metrics.api.engine import COHORT_REGEX, parse_cohorts, \
-    DATETIME_STR_FORMAT
+from user_metrics.config import logging, settings
+from user_metrics.api.engine import COHORT_REGEX, parse_cohorts
 from user_metrics.api.engine.request_meta import REQUEST_META_QUERY_STR,\
-    REQUEST_META_BASE
+    REQUEST_META_BASE, build_request_obj
 from user_metrics.api import MetricsAPIError, query_mod
-from user_metrics.config import settings
 
 
 # This is used to separate key meta and key strings for hash table data
@@ -90,39 +85,13 @@
     return users
 
 
-def get_cohort_refresh_datetime(utm_id):
-    """
-        Get the latest refresh datetime of a cohort.  Returns current time
-        formatted as a string if the field is not found.
-    """
-
-    # @TODO MOVE DB REFS INTO QUERY MODULE
-    conn = dl.Connector(instance=settings.__cohort_data_instance__)
-    query = """ SELECT utm_touched FROM usertags_meta WHERE utm_id = %s """
-    conn._cur_.execute(query, int(utm_id))
-
-    utm_touched = None
-    try:
-        utm_touched = conn._cur_.fetchone()[0]
-    except ValueError:
-        pass
-
-    # Ensure the field was retrieved
-    if not utm_touched:
-        logging.error(__name__ + '::Missing utm_touched for cohort %s.' %
-                                 str(utm_id))
-        utm_touched = datetime.now()
-
-    del conn
-    return utm_touched.strftime(DATETIME_STR_FORMAT)
-
-
-def get_data(request_meta, hash_result=True):
+def get_data(request, hash_result=True):
     """
         Extract data from the global hash given a request object.  If an item
         is successfully recovered data is returned
     """
 
+    request_obj = build_request_obj(request)
     hash_table_ref = read_pickle_data()
 
     # Traverse the hash key structure to find data
@@ -131,9 +100,9 @@
 
     logging.debug(__name__ + " - Attempting to pull data for request " \
                              "COHORT {0}, METRIC {1}".
-                  format(request_meta.cohort_expr, request_meta.metric))
+                  format(request_obj.cohort_expr, request_obj.metric))
 
-    key_sig = build_key_signature(request_meta, hash_result=hash_result)
+    key_sig = build_key_signature(request_obj, hash_result=hash_result)
     item = find_item(hash_table_ref, key_sig)
 
     if item:
diff --git a/user_metrics/api/engine/request_manager.py 
b/user_metrics/api/engine/request_manager.py
index f25e157..36005ce 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -84,9 +84,9 @@
 from user_metrics.config import logging, settings
 from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
     REQUEST_BROKER_TARGET, umapi_broker_context,\
-    RESPONSE_BROKER_TARGET
+    RESPONSE_BROKER_TARGET, PROCESS_BROKER_TARGET
 from user_metrics.api.engine.data import get_users
-from user_metrics.api.engine.request_meta import rebuild_unpacked_request
+from user_metrics.api.engine.request_meta import build_request_obj
 from user_metrics.metrics.users import MediaWikiUser
 from user_metrics.metrics.user_metric import UserMetricError
 
@@ -95,7 +95,7 @@
 from os import getpid
 from sys import getsizeof
 import time
-
+from hashlib import sha1
 
 # API JOB HANDLER
 # ###############
@@ -148,7 +148,13 @@
 
         logging.debug(log_name + ' - POLLING REQUESTS...')
         if concurrent_jobs <= MAX_CONCURRENT_JOBS:
+
+            # Pop from request target
             req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
+
+            # Push to process target
+            url_hash = sha1(req_item.encode('utf-8')).hexdigest()
+            umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash, req_item)
         else:
             continue
 
@@ -171,9 +177,13 @@
                 while not job_item.queue.empty():
                     data += job_item.queue.get(True)
 
-                # Put the response string
-                umapi_broker_context.add(RESPONSE_BROKER_TARGET,
-                                         [job_item.request, data])
+                # Remove from process target
+                url_hash = sha1(job_item.request.encode('utf-8')).hexdigest()
+                umapi_broker_context.remove(PROCESS_BROKER_TARGET, url_hash)
+
+                # Add to response target
+                umapi_broker_context.add(RESPONSE_BROKER_TARGET, url_hash,
+                                         str(''.join([job_item.request, '--', 
data])))
 
                 del job_queue[job_queue.index(job_item)]
                 concurrent_jobs -= 1
@@ -201,7 +211,7 @@
     logging.debug('{0} - FINISHING.'.format(log_name))
 
 
-def process_metrics(p, request_meta):
+def process_metrics(p, request):
     """
         Worker process for requests, forked from the job controller.  This
         method handles:
@@ -214,19 +224,23 @@
     log_name = '{0} :: {1}'.format(__name__, process_metrics.__name__)
 
     logging.info(log_name + ' - START JOB'
-                            '\n\tCOHORT = {0} - METRIC = {1}'
-                            ' -  PID = {2})'.
-                 format(request_meta.cohort_expr, request_meta.metric,
-                        getpid()))
+                            '\n\t{0} -  PID = {2})'.
+                 format(request, getpid()))
 
     err_msg = __name__ + ' :: Request failed.'
     users = list()
 
+    try:
+        request_obj = build_request_obj(request)
+    except MetricsAPIError as e:
+        # TODO - flag job as failed
+        return
+
     # obtain user list - handle the case where a lone user ID is passed
     # !! The username should already be validated
-    if request_meta.is_user:
-        uid = MediaWikiUser.is_user_name(request_meta.cohort_expr,
-                                         request_meta.project)
+    if request_obj.is_user:
+        uid = MediaWikiUser.is_user_name(request_obj.cohort_expr,
+                                         request_obj.project)
         if uid:
             valid = True
             users = [uid]
@@ -235,13 +249,13 @@
             err_msg = error_codes[3]
 
     # The "all" user group.  All users within a time period.
-    elif request_meta.cohort_expr == 'all':
+    elif request_obj.cohort_expr == 'all':
         users = MediaWikiUser(query_type=1)
 
         try:
             users = [u for u in users.get_users(
-                request_meta.start, request_meta.end,
-                project=request_meta.project)]
+                request_obj.start, request_obj.end,
+                project=request_obj.project)]
             valid = True
         except Exception:
             valid = False
@@ -249,13 +263,13 @@
 
     # "TYPICAL" COHORT PROCESSING
     else:
-        users = get_users(request_meta.cohort_expr)
+        users = get_users(request_obj.cohort_expr)
 
         # Default project is what is stored in usertags_meta
         project = query_mod.get_cohort_project_by_meta(
-            request_meta.cohort_expr)
+            request_obj.cohort_expr)
         if project:
-            request_meta.project = project
+            request_obj.project = project
         logging.debug(__name__ + ' :: Using default project from '
                                  'usertags_meta {0}.'.format(project))
 
@@ -264,7 +278,7 @@
 
     if valid:
         # process request
-        results = process_data_request(request_meta, users)
+        results = process_data_request(request_obj, users)
         results = str(results)
         response_size = getsizeof(results, None)
 
@@ -280,14 +294,14 @@
 
         logging.info(log_name + ' - END JOB'
                                 '\n\tCOHORT = {0}- METRIC = {1} -  PID = {2})'.
-                     format(request_meta.cohort_expr, request_meta.metric,
+                     format(request_obj.cohort_expr, request_obj.metric,
                             getpid()))
 
     else:
         p.put(err_msg, block=True)
         logging.info(log_name + ' - END JOB - FAILED.'
                                 '\n\tCOHORT = {0}- METRIC = {1} -  PID = {2})'.
-                     format(request_meta.cohort_expr, request_meta.metric,
+                     format(request_obj.cohort_expr, request_obj.metric,
                             getpid()))
 
 # REQUEST FLOW HANDLER
diff --git a/user_metrics/api/engine/request_meta.py 
b/user_metrics/api/engine/request_meta.py
index 32d7839..95cd737 100644
--- a/user_metrics/api/engine/request_meta.py
+++ b/user_metrics/api/engine/request_meta.py
@@ -38,9 +38,9 @@
 
 from user_metrics.utils import format_mediawiki_timestamp, enum
 from user_metrics.utils.record_type import recordtype
-from user_metrics.api import MetricsAPIError
-from user_metrics.api.engine import DEFAULT_QUERY_VAL
-from user_metrics.metrics.users import USER_METRIC_PERIOD_TYPE
+from user_metrics.api import MetricsAPIError, query_mod
+from user_metrics.api.engine import DEFAULT_QUERY_VAL, 
get_cohort_refresh_datetime
+from user_metrics.metrics.users import USER_METRIC_PERIOD_TYPE,MediaWikiUser
 from collections import namedtuple, OrderedDict
 from flask import escape
 from user_metrics.config import logging
@@ -62,7 +62,7 @@
 }
 
 
-def parse_request(request):
+def parse_raw_request(request):
     """
     Parses umapi requests
     """
@@ -112,6 +112,46 @@
     rt = recordtype("RequestMeta", params)
     return eval('rt' + arg_str)
 
+
+def build_request_obj(request):
+    """
+    Build a request and validate.
+
+        1. Populate with request parameters from query args.
+        2. Filter the input discarding any url junk
+        3. Process defaults for request parameters
+        4. See if this maps to a single user request
+    """
+
+    parsed_req = parse_raw_request(request)
+
+    # Get the refresh date of the cohort
+    try:
+        cid = query_mod.get_cohort_id(parsed_req.cohort)
+        cohort_refresh_ts = get_cohort_refresh_datetime(cid)
+
+    except Exception:
+        cohort_refresh_ts = None
+        logging.error(__name__ + ' :: Could not retrieve refresh '
+                                 'time of cohort.')
+
+    rm = RequestMetaFactory(parsed_req.cohort, cohort_refresh_ts,
+                                parsed_req.metric)
+    filter_request_input(request, rm)
+    format_request_params(rm)
+
+    if rm.is_user:
+        project = rm.project if rm.project else 'enwiki'
+        if not MediaWikiUser.is_user_name(parsed_req.cohort, project):
+            err_msg = __name__ + ' :: "{0}" is not a valid username ' \
+                                 'in "{1}"'.format(parsed_req.cohort, project)
+            raise MetricsAPIError(err_msg)
+    else:
+        # @TODO CALL COHORT VALIDATION HERE
+        pass
+
+    return rm
+
 # Defines what variables may be extracted from the query string
 REQUEST_META_QUERY_STR = ['aggregator', 'time_series', 'project', 'namespace',
                           'start', 'end', 'slice', 't', 'n',
diff --git a/user_metrics/api/session.py b/user_metrics/api/session.py
index d6f3777..d68911a 100644
--- a/user_metrics/api/session.py
+++ b/user_metrics/api/session.py
@@ -30,7 +30,7 @@
         check_password_hash
 
     from flask.ext.login import LoginManager, current_user, UserMixin, \
-        AnonymousUser, confirm_login
+        AnonymousUserMixin, confirm_login
 
     class APIUser(UserMixin):
         """
@@ -126,7 +126,7 @@
                         format(self.name))
                 self.active = True
 
-    class Anonymous(AnonymousUser):
+    class Anonymous(AnonymousUserMixin):
         name = u'Anonymous'
 
     login_manager = LoginManager()
diff --git a/user_metrics/api/views.py b/user_metrics/api/views.py
index df6a259..9308576 100644
--- a/user_metrics/api/views.py
+++ b/user_metrics/api/views.py
@@ -21,14 +21,11 @@
 
 from user_metrics.etl.data_loader import Connector
 from user_metrics.config import logging, settings
-from user_metrics.api.engine.data import get_cohort_refresh_datetime, \
-    get_data, get_url_from_keys, build_key_signature, read_pickle_data
-from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
+from user_metrics.api.engine.data import get_data, get_url_from_keys, \
+    read_pickle_data
+from user_metrics.api import error_codes, query_mod, \
     REQUEST_BROKER_TARGET, umapi_broker_context
-from user_metrics.api.engine.request_meta import filter_request_input, \
-    format_request_params, RequestMetaFactory, \
-    get_metric_names
-from user_metrics.metrics.users import MediaWikiUser
+from user_metrics.api.engine.request_meta import get_metric_names
 from user_metrics.api.session import APIUser
 import user_metrics.config.settings as conf
 from hashlib import sha1
@@ -383,79 +380,40 @@
                                m_list=get_metric_names(), error=error)
 
 
-def output(cohort, metric):
-    """ View corresponding to a data request -
-        All of the setup and execution for a request happens here. """
+def output():
+    """
+    View corresponding to a data request.  Fetches response if it exists or 
adds
+    request to the request broker target.
+    """
 
     # Check for refresh flag
     refresh = True if 'refresh' in request.args else False
 
-    # Get the refresh date of the cohort
-    try:
-        cid = query_mod.get_cohort_id(cohort)
-        cohort_refresh_ts = get_cohort_refresh_datetime(cid)
-    except Exception:
-        cohort_refresh_ts = None
-        logging.error(__name__ + ' :: Could not retrieve refresh '
-                                 'time of cohort.')
-
-    # Build a request and validate.
-    #
-    # 1. Populate with request parameters from query args.
-    # 2. Filter the input discarding any url junk
-    # 3. Process defaults for request parameters
-    # 4. See if this maps to a single user request
-    # 5. See if this maps to a single user request
-    try:
-        rm = RequestMetaFactory(cohort, cohort_refresh_ts, metric)
-    except MetricsAPIError as e:
-        return redirect(url_for('all_cohorts') + '?error=' +
-                        str(e.error_code))
-
-    filter_request_input(request, rm)
-    try:
-        format_request_params(rm)
-    except MetricsAPIError as e:
-        return redirect(url_for('all_cohorts') + '?error=' +
-                        str(e.error_code))
-
-    if rm.is_user:
-        project = rm.project if rm.project else 'enwiki'
-        if not MediaWikiUser.is_user_name(cohort, project):
-            logging.error(__name__ + ' :: "{0}" is not a valid username '
-                                     'in "{1}"'.format(cohort, project))
-            return redirect(url_for('all_cohorts') + '?error=3')
-    else:
-        # @TODO CALL COHORT VALIDATION HERE
-        pass
-
-    # Determine if the request maps to an existing response.
-    #
-    # 1. The response already exists in the hash, return.
-    # 2. Otherwise, add the request tot the queue.
-    data = get_data(rm)
-    key_sig = build_key_signature(rm, hash_result=True)
+    data = get_data(request.url)
 
     # Is the request already running?
     # TODO check req_target
+    is_queued = False
     is_running = False
 
-    # Determine if request is already hashed
+    # Determine if response is already cached
     if data and not refresh:
         return make_response(jsonify(data))
 
+    # Determine if the job is already queued
+    elif is_queued:
+        return render_template('processing.html', error=error_codes[6])
+
     # Determine if the job is already running
     elif is_running:
-        return render_template('processing.html',
-                               error=error_codes[0],
-                               url_str=str(rm))
+        return render_template('processing.html', error=error_codes[0])
 
     # Add the request to the queue
     else:
-        hash = sha1(request.url.encode('utf-8')).hexdigest()
-        umapi_broker_context.add(REQUEST_BROKER_TARGET, hash, request.url)
+        url_hash = sha1(request.url.encode('utf-8')).hexdigest()
+        umapi_broker_context.add(REQUEST_BROKER_TARGET, url_hash, request.url)
 
-    return render_template('processing.html', url_str=str(rm))
+        return render_template('processing.html')
 
 
 def job_queue():

-- 
To view, visit https://gerrit.wikimedia.org/r/77683
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I89e8ac360192ecbbe349df4d9370ce751b96c24d
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
Gerrit-Reviewer: Rfaulk <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to