Rfaulk has submitted this change and it was merged.
Change subject: fix. typo / pep8.
......................................................................
fix. typo / pep8.
Change-Id: I9abd6062345a4addefba39f4fd508ddc2b269caa
---
M user_metrics/api/engine/request_manager.py
1 file changed, 34 insertions(+), 36 deletions(-)
Approvals:
Rfaulk: Verified; Looks good to me, approved
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index cdbcba2..f25e157 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -146,7 +146,7 @@
# Request Queue Processing
# ------------------------
- logging.debug(log_name + ' - POLLING REQUESTS...')
+ logging.debug(log_name + ' - POLLING REQUESTS...')
if concurrent_jobs <= MAX_CONCURRENT_JOBS:
req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
else:
@@ -155,9 +155,9 @@
if not req_item:
continue
- logging.debug(log_name + ' :: PULLING item from request queue -> ' \
+ logging.debug(log_name + ' :: PULLING item from request queue -> '
'\n\t{0}'
- .format(req_item))
+ .format(req_item))
# Process complete jobs
# ---------------------
@@ -171,14 +171,15 @@
while not job_item.queue.empty():
data += job_item.queue.get(True)
- # Put the response strinf
- umapi_broker_context.add(RESPONSE_BROKER_TARGET,
[job_item.request, data])
+ # Put the response string
+ umapi_broker_context.add(RESPONSE_BROKER_TARGET,
+ [job_item.request, data])
del job_queue[job_queue.index(job_item)]
concurrent_jobs -= 1
- logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'\
+ logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'
'\n\tConcurrent jobs = {1}'
- .format(str(job_item.id), concurrent_jobs))
+ .format(str(job_item.id), concurrent_jobs))
# Process request
# ---------------
@@ -193,7 +194,7 @@
concurrent_jobs += 1
job_id += 1
- logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}' \
+ logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}'
'\n\tConcurrent jobs = {1}, REQ = {2}'
.format(str(job_id), concurrent_jobs, req_item))
@@ -215,7 +216,8 @@
logging.info(log_name + ' - START JOB'
'\n\tCOHORT = {0} - METRIC = {1}'
' - PID = {2})'.
- format(request_meta.cohort_expr, request_meta.metric, getpid()))
+ format(request_meta.cohort_expr, request_meta.metric,
+ getpid()))
err_msg = __name__ + ' :: Request failed.'
users = list()
@@ -254,7 +256,7 @@
request_meta.cohort_expr)
if project:
request_meta.project = project
- logging.debug(__name__ + ' :: Using default project from ' \
+ logging.debug(__name__ + ' :: Using default project from '
'usertags_meta {0}.'.format(project))
valid = True
@@ -277,23 +279,19 @@
p.put(results, block=True)
logging.info(log_name + ' - END JOB'
- '\n\tCOHORT = {0} - METRIC = {1}'
- ' - PID = {2})'.
- format(request_meta.cohort_expr, request_meta.metric, getpid()))
+ '\n\tCOHORT = {0}- METRIC = {1} - PID = {2})'.
+ format(request_meta.cohort_expr, request_meta.metric,
+ getpid()))
else:
p.put(err_msg, block=True)
logging.info(log_name + ' - END JOB - FAILED.'
- '\n\tCOHORT = {0} - METRIC = {1}'
- ' - PID = {2})'.
- format(request_meta.cohort_expr, request_meta.metric, getpid()))
-
-
-
+ '\n\tCOHORT = {0}- METRIC = {1} - PID = {2})'.
+ format(request_meta.cohort_expr, request_meta.metric,
+ getpid()))
# REQUEST FLOW HANDLER
# ###################
-
from dateutil.parser import parse as date_parse
from copy import deepcopy
@@ -316,6 +314,7 @@
# create shorthand method refs
to_string = DataLoader().cast_elems_to_string
+
def process_data_request(request_meta, users):
"""
@@ -362,7 +361,7 @@
# Determine intervals and thread allocation
total_intervals = (date_parse(end) - date_parse(start)).\
- total_seconds() / (3600 * request_meta.slice)
+ total_seconds() / (3600 * request_meta.slice)
time_threads = max(1, int(total_intervals / INTERVALS_PER_THREAD))
time_threads = min(MAX_THREADS, time_threads)
@@ -374,9 +373,9 @@
'agg': request_meta.aggregator,
'start': str(start),
'end': str(end),
- })
+ })
metric_threads = '"k_" : {0}, "kr_" : {1}'.format(USER_THREADS,
- REVISION_THREADS)
+ REVISION_THREADS)
metric_threads = '{' + metric_threads + '}'
new_kwargs = deepcopy(args)
@@ -387,19 +386,18 @@
del new_kwargs['datetime_end']
out = tspm.build_time_series(start,
- end,
- request_meta.slice,
- metric_class,
- aggregator_func,
- users,
- kt_=time_threads,
- metric_threads=metric_threads,
- log=True,
- **new_kwargs)
+ end,
+ request_meta.slice,
+ metric_class,
+ aggregator_func,
+ users,
+ kt_=time_threads,
+ metric_threads=metric_threads,
+ log=True,
+ **new_kwargs)
results['header'] = ['timestamp'] + \
- getattr(aggregator_func,
- um.METRIC_AGG_METHOD_HEAD)
+ getattr(aggregator_func, um.METRIC_AGG_METHOD_HEAD)
for row in out:
timestamp = date_parse(row[0][:19]).strftime(
DATETIME_STR_FORMAT)
@@ -422,7 +420,7 @@
'agg': request_meta.aggregator,
'start': str(start),
'end': str(end),
- })
+ })
try:
metric_obj.process(users,
@@ -447,7 +445,7 @@
'metric': metric_class.__name__,
'start': str(start),
'end': str(end),
- })
+ })
try:
metric_obj.process(users,
k_=USER_THREADS,
--
To view, visit https://gerrit.wikimedia.org/r/77682
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: I9abd6062345a4addefba39f4fd508ddc2b269caa
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
Gerrit-Reviewer: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits