Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/78476
Change subject: mod. condition on which jobs are processed in main loop / pep 8
/ logging.
......................................................................
mod. condition on which jobs are processed in main loop / pep 8 / logging.
Change-Id: Ic3fe864236e9bdaf071239d4244cb0017aae0d1e
---
M user_metrics/api/engine/request_manager.py
1 file changed, 16 insertions(+), 10 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/76/78476/1
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index 13277f0..4770577 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -87,8 +87,7 @@
RESPONSE_BROKER_TARGET, PROCESS_BROKER_TARGET
from user_metrics.api.engine import pack_response_for_broker
from user_metrics.api.engine.data import get_users
-from user_metrics.api.engine.request_meta import build_request_obj, \
- parse_raw_request
+from user_metrics.api.engine.request_meta import build_request_obj
from user_metrics.metrics.users import MediaWikiUser
from user_metrics.metrics.user_metric import UserMetricError
@@ -148,7 +147,8 @@
# Request Queue Processing
# ------------------------
- logging.debug(log_name + ' - POLLING REQUESTS...')
+ logging.debug(log_name + ' :: POLLING REQUESTS...')
+ logging.debug(log_name + ' :: JOB QUEUE - {0}'.format(str(job_queue)))
if concurrent_jobs <= MAX_CONCURRENT_JOBS:
# Pop from request target
@@ -157,13 +157,15 @@
# Push to process target
if req_item:
url_hash = sha1(req_item.encode('utf-8')).hexdigest()
- umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash,
req_item)
+ umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash,
+ req_item)
else:
continue
else:
continue
- if not req_item:
+ # Continue if there is no request or the job_queue is empty
+ if not req_item and not concurrent_jobs:
continue
logging.debug(log_name + ' :: PULLING item from request queue -> '
@@ -177,6 +179,9 @@
if not job_item.queue.empty():
+ logging.info(log_name + ' :: READING RESPONSE - {0}'.
+ format(job_item.request))
+
# Pull data off of the queue and add it to response queue
data = ''
while not job_item.queue.empty():
@@ -185,9 +190,10 @@
# Remove from process target
url_hash = sha1(job_item.request.encode('utf-8')).hexdigest()
try:
- umapi_broker_context.remove(PROCESS_BROKER_TARGET,
url_hash)
+ umapi_broker_context.remove(PROCESS_BROKER_TARGET,
+ url_hash)
except Exception as e:
- logging.error(__name__ + ' :: Could not process '
+ logging.error(log_name + ' :: Could not process '
'{0} from {1} -- {2}'.
format(job_item.request,
PROCESS_BROKER_TARGET,
@@ -236,7 +242,7 @@
log_name = '{0} :: {1}'.format(__name__, process_metrics.__name__)
- logging.info(log_name + ' - START JOB'
+ logging.info(log_name + ' :: START JOB'
'\n\t{0} - PID = {1})'.
format(request_url, getpid()))
@@ -305,14 +311,14 @@
else:
p.put(results, block=True)
- logging.info(log_name + ' - END JOB'
+ logging.info(log_name + ' :: END JOB'
'\n\tCOHORT = {0}- METRIC = {1} - PID = {2})'.
format(request_obj.cohort_expr, request_obj.metric,
getpid()))
else:
p.put(err_msg, block=True)
- logging.info(log_name + ' - END JOB - FAILED.'
+ logging.info(log_name + ' :: END JOB - FAILED.'
'\n\tCOHORT = {0}- METRIC = {1} - PID = {2})'.
format(request_obj.cohort_expr, request_obj.metric,
getpid()))
--
To view, visit https://gerrit.wikimedia.org/r/78476
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic3fe864236e9bdaf071239d4244cb0017aae0d1e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits