Rfaulk has submitted this change and it was merged.
Change subject: add. form request_obj from raw request.
......................................................................
add. form request_obj from raw request.
add. method to construct request object with validation + filtering.
mod. get_data depends on raw request.
mod. simplify output view - now it just handles responding to the request which
may include adding a request to the broker request target.
add. new error code for fetching queued requests.
mod. in output handle case where job is already queued.
add. a new broker target for jobs processing or running.
add. read/write to process broker target.
fix. call to add method in broker.
fix. cyclical dependency between request meta and data.
Change-Id: I51de1bba55f9b604cbf8ab03948816935841d8af
fix. Changed AnonymousUser to AnonymousUserMixin for flask-login 0.9
compatibility.
Change-Id: I89e8ac360192ecbbe349df4d9370ce751b96c24d
---
M user_metrics/api/__init__.py
M user_metrics/api/engine/__init__.py
M user_metrics/api/engine/data.py
M user_metrics/api/engine/request_manager.py
M user_metrics/api/engine/request_meta.py
M user_metrics/api/session.py
M user_metrics/api/views.py
7 files changed, 143 insertions(+), 129 deletions(-)
Approvals:
Rfaulk: Verified; Looks good to me, approved
diff --git a/user_metrics/api/__init__.py b/user_metrics/api/__init__.py
index 5d41e16..91020a8 100644
--- a/user_metrics/api/__init__.py
+++ b/user_metrics/api/__init__.py
@@ -12,6 +12,7 @@
REQUEST_BROKER_TARGET = 'request_broker.txt'
RESPONSE_BROKER_TARGET = 'response_broker.txt'
+PROCESS_BROKER_TARGET = 'process_broker.txt'
umapi_broker_context = FileBroker()
@@ -32,6 +33,7 @@
3: 'Could not find User ID.',
4: 'Bad metric name.',
5: 'Failed to retrieve users.',
+ 6: 'Job is currently queued.',
}
diff --git a/user_metrics/api/engine/__init__.py
b/user_metrics/api/engine/__init__.py
index 3f4d2d6..0fc7e57 100644
--- a/user_metrics/api/engine/__init__.py
+++ b/user_metrics/api/engine/__init__.py
@@ -35,7 +35,11 @@
__license__ = "GPL (version 2 or later)"
from re import search
+from user_metrics.config import settings, logging
from user_metrics.api import MetricsAPIError, query_mod
+from datetime import datetime
+import user_metrics.etl.data_loader as dl
+
#
# Define remaining constants
@@ -119,3 +123,30 @@
for key in user_ids:
if user_ids[key] > 1:
yield key
+
+
+def get_cohort_refresh_datetime(utm_id):
+ """
+ Get the latest refresh datetime of a cohort. Returns current time
+ formatted as a string if the field is not found.
+ """
+
+ # @TODO MOVE DB REFS INTO QUERY MODULE
+ conn = dl.Connector(instance=settings.__cohort_data_instance__)
+ query = """ SELECT utm_touched FROM usertags_meta WHERE utm_id = %s """
+ conn._cur_.execute(query, int(utm_id))
+
+ utm_touched = None
+ try:
+ utm_touched = conn._cur_.fetchone()[0]
+ except ValueError:
+ pass
+
+ # Ensure the field was retrieved
+ if not utm_touched:
+ logging.error(__name__ + '::Missing utm_touched for cohort %s.' %
+ str(utm_id))
+ utm_touched = datetime.now()
+
+ del conn
+ return utm_touched.strftime(DATETIME_STR_FORMAT)
diff --git a/user_metrics/api/engine/data.py b/user_metrics/api/engine/data.py
index 6b452b1..6165ef9 100644
--- a/user_metrics/api/engine/data.py
+++ b/user_metrics/api/engine/data.py
@@ -13,7 +13,6 @@
get_users(cohort_expr)
get_cohort_id(utm_name)
- get_cohort_refresh_datetime(utm_id)
The other portion of data storage and retrieval is concerned with providing
functionality that enables responses to be cached. Request responses are
@@ -48,21 +47,17 @@
__license__ = "GPL (version 2 or later)"
-from datetime import datetime
from re import search
from collections import OrderedDict
from hashlib import sha1
import cPickle
import os
-import user_metrics.etl.data_loader as dl
-from user_metrics.config import logging
-from user_metrics.api.engine import COHORT_REGEX, parse_cohorts, \
- DATETIME_STR_FORMAT
+from user_metrics.config import logging, settings
+from user_metrics.api.engine import COHORT_REGEX, parse_cohorts
from user_metrics.api.engine.request_meta import REQUEST_META_QUERY_STR,\
- REQUEST_META_BASE
+ REQUEST_META_BASE, build_request_obj
from user_metrics.api import MetricsAPIError, query_mod
-from user_metrics.config import settings
# This is used to separate key meta and key strings for hash table data
@@ -90,39 +85,13 @@
return users
-def get_cohort_refresh_datetime(utm_id):
- """
- Get the latest refresh datetime of a cohort. Returns current time
- formatted as a string if the field is not found.
- """
-
- # @TODO MOVE DB REFS INTO QUERY MODULE
- conn = dl.Connector(instance=settings.__cohort_data_instance__)
- query = """ SELECT utm_touched FROM usertags_meta WHERE utm_id = %s """
- conn._cur_.execute(query, int(utm_id))
-
- utm_touched = None
- try:
- utm_touched = conn._cur_.fetchone()[0]
- except ValueError:
- pass
-
- # Ensure the field was retrieved
- if not utm_touched:
- logging.error(__name__ + '::Missing utm_touched for cohort %s.' %
- str(utm_id))
- utm_touched = datetime.now()
-
- del conn
- return utm_touched.strftime(DATETIME_STR_FORMAT)
-
-
-def get_data(request_meta, hash_result=True):
+def get_data(request, hash_result=True):
"""
Extract data from the global hash given a request object. If an item
is successfully recovered data is returned
"""
+ request_obj = build_request_obj(request)
hash_table_ref = read_pickle_data()
# Traverse the hash key structure to find data
@@ -131,9 +100,9 @@
logging.debug(__name__ + " - Attempting to pull data for request " \
"COHORT {0}, METRIC {1}".
- format(request_meta.cohort_expr, request_meta.metric))
+ format(request_obj.cohort_expr, request_obj.metric))
- key_sig = build_key_signature(request_meta, hash_result=hash_result)
+ key_sig = build_key_signature(request_obj, hash_result=hash_result)
item = find_item(hash_table_ref, key_sig)
if item:
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index f25e157..36005ce 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -84,9 +84,9 @@
from user_metrics.config import logging, settings
from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
REQUEST_BROKER_TARGET, umapi_broker_context,\
- RESPONSE_BROKER_TARGET
+ RESPONSE_BROKER_TARGET, PROCESS_BROKER_TARGET
from user_metrics.api.engine.data import get_users
-from user_metrics.api.engine.request_meta import rebuild_unpacked_request
+from user_metrics.api.engine.request_meta import build_request_obj
from user_metrics.metrics.users import MediaWikiUser
from user_metrics.metrics.user_metric import UserMetricError
@@ -95,7 +95,7 @@
from os import getpid
from sys import getsizeof
import time
-
+from hashlib import sha1
# API JOB HANDLER
# ###############
@@ -148,7 +148,13 @@
logging.debug(log_name + ' - POLLING REQUESTS...')
if concurrent_jobs <= MAX_CONCURRENT_JOBS:
+
+ # Pop from request target
req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
+
+ # Push to process target
+ url_hash = sha1(req_item.encode('utf-8')).hexdigest()
+ umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash, req_item)
else:
continue
@@ -171,9 +177,13 @@
while not job_item.queue.empty():
data += job_item.queue.get(True)
- # Put the response string
- umapi_broker_context.add(RESPONSE_BROKER_TARGET,
- [job_item.request, data])
+ # Remove from process target
+ url_hash = sha1(job_item.request.encode('utf-8')).hexdigest()
+ umapi_broker_context.remove(PROCESS_BROKER_TARGET, url_hash)
+
+ # Add to response target
+ umapi_broker_context.add(RESPONSE_BROKER_TARGET, url_hash,
+ str(''.join([job_item.request, '--',
data])))
del job_queue[job_queue.index(job_item)]
concurrent_jobs -= 1
@@ -201,7 +211,7 @@
logging.debug('{0} - FINISHING.'.format(log_name))
-def process_metrics(p, request_meta):
+def process_metrics(p, request):
"""
Worker process for requests, forked from the job controller. This
method handles:
@@ -214,19 +224,23 @@
log_name = '{0} :: {1}'.format(__name__, process_metrics.__name__)
logging.info(log_name + ' - START JOB'
- '\n\tCOHORT = {0} - METRIC = {1}'
- ' - PID = {2})'.
- format(request_meta.cohort_expr, request_meta.metric,
- getpid()))
+ '\n\t{0} - PID = {2})'.
+ format(request, getpid()))
err_msg = __name__ + ' :: Request failed.'
users = list()
+ try:
+ request_obj = build_request_obj(request)
+ except MetricsAPIError as e:
+ # TODO - flag job as failed
+ return
+
# obtain user list - handle the case where a lone user ID is passed
# !! The username should already be validated
- if request_meta.is_user:
- uid = MediaWikiUser.is_user_name(request_meta.cohort_expr,
- request_meta.project)
+ if request_obj.is_user:
+ uid = MediaWikiUser.is_user_name(request_obj.cohort_expr,
+ request_obj.project)
if uid:
valid = True
users = [uid]
@@ -235,13 +249,13 @@
err_msg = error_codes[3]
# The "all" user group. All users within a time period.
- elif request_meta.cohort_expr == 'all':
+ elif request_obj.cohort_expr == 'all':
users = MediaWikiUser(query_type=1)
try:
users = [u for u in users.get_users(
- request_meta.start, request_meta.end,
- project=request_meta.project)]
+ request_obj.start, request_obj.end,
+ project=request_obj.project)]
valid = True
except Exception:
valid = False
@@ -249,13 +263,13 @@
# "TYPICAL" COHORT PROCESSING
else:
- users = get_users(request_meta.cohort_expr)
+ users = get_users(request_obj.cohort_expr)
# Default project is what is stored in usertags_meta
project = query_mod.get_cohort_project_by_meta(
- request_meta.cohort_expr)
+ request_obj.cohort_expr)
if project:
- request_meta.project = project
+ request_obj.project = project
logging.debug(__name__ + ' :: Using default project from '
'usertags_meta {0}.'.format(project))
@@ -264,7 +278,7 @@
if valid:
# process request
- results = process_data_request(request_meta, users)
+ results = process_data_request(request_obj, users)
results = str(results)
response_size = getsizeof(results, None)
@@ -280,14 +294,14 @@
logging.info(log_name + ' - END JOB'
'\n\tCOHORT = {0}- METRIC = {1} - PID = {2})'.
- format(request_meta.cohort_expr, request_meta.metric,
+ format(request_obj.cohort_expr, request_obj.metric,
getpid()))
else:
p.put(err_msg, block=True)
logging.info(log_name + ' - END JOB - FAILED.'
'\n\tCOHORT = {0}- METRIC = {1} - PID = {2})'.
- format(request_meta.cohort_expr, request_meta.metric,
+ format(request_obj.cohort_expr, request_obj.metric,
getpid()))
# REQUEST FLOW HANDLER
diff --git a/user_metrics/api/engine/request_meta.py
b/user_metrics/api/engine/request_meta.py
index 32d7839..95cd737 100644
--- a/user_metrics/api/engine/request_meta.py
+++ b/user_metrics/api/engine/request_meta.py
@@ -38,9 +38,9 @@
from user_metrics.utils import format_mediawiki_timestamp, enum
from user_metrics.utils.record_type import recordtype
-from user_metrics.api import MetricsAPIError
-from user_metrics.api.engine import DEFAULT_QUERY_VAL
-from user_metrics.metrics.users import USER_METRIC_PERIOD_TYPE
+from user_metrics.api import MetricsAPIError, query_mod
+from user_metrics.api.engine import DEFAULT_QUERY_VAL,
get_cohort_refresh_datetime
+from user_metrics.metrics.users import USER_METRIC_PERIOD_TYPE,MediaWikiUser
from collections import namedtuple, OrderedDict
from flask import escape
from user_metrics.config import logging
@@ -62,7 +62,7 @@
}
-def parse_request(request):
+def parse_raw_request(request):
"""
Parses umapi requests
"""
@@ -112,6 +112,46 @@
rt = recordtype("RequestMeta", params)
return eval('rt' + arg_str)
+
+def build_request_obj(request):
+ """
+ Build a request and validate.
+
+ 1. Populate with request parameters from query args.
+ 2. Filter the input discarding any url junk
+ 3. Process defaults for request parameters
+ 4. See if this maps to a single user request
+ """
+
+ parsed_req = parse_raw_request(request)
+
+ # Get the refresh date of the cohort
+ try:
+ cid = query_mod.get_cohort_id(parsed_req.cohort)
+ cohort_refresh_ts = get_cohort_refresh_datetime(cid)
+
+ except Exception:
+ cohort_refresh_ts = None
+ logging.error(__name__ + ' :: Could not retrieve refresh '
+ 'time of cohort.')
+
+ rm = RequestMetaFactory(parsed_req.cohort, cohort_refresh_ts,
+ parsed_req.metric)
+ filter_request_input(request, rm)
+ format_request_params(rm)
+
+ if rm.is_user:
+ project = rm.project if rm.project else 'enwiki'
+ if not MediaWikiUser.is_user_name(parsed_req.cohort, project):
+ err_msg = __name__ + ' :: "{0}" is not a valid username ' \
+ 'in "{1}"'.format(parsed_req.cohort, project)
+ raise MetricsAPIError(err_msg)
+ else:
+ # @TODO CALL COHORT VALIDATION HERE
+ pass
+
+ return rm
+
# Defines what variables may be extracted from the query string
REQUEST_META_QUERY_STR = ['aggregator', 'time_series', 'project', 'namespace',
'start', 'end', 'slice', 't', 'n',
diff --git a/user_metrics/api/session.py b/user_metrics/api/session.py
index d6f3777..d68911a 100644
--- a/user_metrics/api/session.py
+++ b/user_metrics/api/session.py
@@ -30,7 +30,7 @@
check_password_hash
from flask.ext.login import LoginManager, current_user, UserMixin, \
- AnonymousUser, confirm_login
+ AnonymousUserMixin, confirm_login
class APIUser(UserMixin):
"""
@@ -126,7 +126,7 @@
format(self.name))
self.active = True
- class Anonymous(AnonymousUser):
+ class Anonymous(AnonymousUserMixin):
name = u'Anonymous'
login_manager = LoginManager()
diff --git a/user_metrics/api/views.py b/user_metrics/api/views.py
index df6a259..9308576 100644
--- a/user_metrics/api/views.py
+++ b/user_metrics/api/views.py
@@ -21,14 +21,11 @@
from user_metrics.etl.data_loader import Connector
from user_metrics.config import logging, settings
-from user_metrics.api.engine.data import get_cohort_refresh_datetime, \
- get_data, get_url_from_keys, build_key_signature, read_pickle_data
-from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
+from user_metrics.api.engine.data import get_data, get_url_from_keys, \
+ read_pickle_data
+from user_metrics.api import error_codes, query_mod, \
REQUEST_BROKER_TARGET, umapi_broker_context
-from user_metrics.api.engine.request_meta import filter_request_input, \
- format_request_params, RequestMetaFactory, \
- get_metric_names
-from user_metrics.metrics.users import MediaWikiUser
+from user_metrics.api.engine.request_meta import get_metric_names
from user_metrics.api.session import APIUser
import user_metrics.config.settings as conf
from hashlib import sha1
@@ -383,79 +380,40 @@
m_list=get_metric_names(), error=error)
-def output(cohort, metric):
- """ View corresponding to a data request -
- All of the setup and execution for a request happens here. """
+def output():
+ """
+ View corresponding to a data request. Fetches response if it exists or
adds
+ request to the request broker target.
+ """
# Check for refresh flag
refresh = True if 'refresh' in request.args else False
- # Get the refresh date of the cohort
- try:
- cid = query_mod.get_cohort_id(cohort)
- cohort_refresh_ts = get_cohort_refresh_datetime(cid)
- except Exception:
- cohort_refresh_ts = None
- logging.error(__name__ + ' :: Could not retrieve refresh '
- 'time of cohort.')
-
- # Build a request and validate.
- #
- # 1. Populate with request parameters from query args.
- # 2. Filter the input discarding any url junk
- # 3. Process defaults for request parameters
- # 4. See if this maps to a single user request
- # 5. See if this maps to a single user request
- try:
- rm = RequestMetaFactory(cohort, cohort_refresh_ts, metric)
- except MetricsAPIError as e:
- return redirect(url_for('all_cohorts') + '?error=' +
- str(e.error_code))
-
- filter_request_input(request, rm)
- try:
- format_request_params(rm)
- except MetricsAPIError as e:
- return redirect(url_for('all_cohorts') + '?error=' +
- str(e.error_code))
-
- if rm.is_user:
- project = rm.project if rm.project else 'enwiki'
- if not MediaWikiUser.is_user_name(cohort, project):
- logging.error(__name__ + ' :: "{0}" is not a valid username '
- 'in "{1}"'.format(cohort, project))
- return redirect(url_for('all_cohorts') + '?error=3')
- else:
- # @TODO CALL COHORT VALIDATION HERE
- pass
-
- # Determine if the request maps to an existing response.
- #
- # 1. The response already exists in the hash, return.
- # 2. Otherwise, add the request tot the queue.
- data = get_data(rm)
- key_sig = build_key_signature(rm, hash_result=True)
+ data = get_data(request.url)
# Is the request already running?
# TODO check req_target
+ is_queued = False
is_running = False
- # Determine if request is already hashed
+ # Determine if response is already cached
if data and not refresh:
return make_response(jsonify(data))
+ # Determine if the job is already queued
+ elif is_queued:
+ return render_template('processing.html', error=error_codes[6])
+
# Determine if the job is already running
elif is_running:
- return render_template('processing.html',
- error=error_codes[0],
- url_str=str(rm))
+ return render_template('processing.html', error=error_codes[0])
# Add the request to the queue
else:
- hash = sha1(request.url.encode('utf-8')).hexdigest()
- umapi_broker_context.add(REQUEST_BROKER_TARGET, hash, request.url)
+ url_hash = sha1(request.url.encode('utf-8')).hexdigest()
+ umapi_broker_context.add(REQUEST_BROKER_TARGET, url_hash, request.url)
- return render_template('processing.html', url_str=str(rm))
+ return render_template('processing.html')
def job_queue():
--
To view, visit https://gerrit.wikimedia.org/r/77683
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I89e8ac360192ecbbe349df4d9370ce751b96c24d
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
Gerrit-Reviewer: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits