Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/77667
Change subject: mod. Use umapi_broker_context to pull new requests from queue.
......................................................................
mod. Use umapi_broker_context to pull new requests from queue.
Change-Id: Iddf7547fc10a903c7b2b09419a7f9f949e3b1ff1
---
M user_metrics/api/engine/request_manager.py
1 file changed, 7 insertions(+), 15 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/67/77667/1
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index b48434d..0d10395 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -83,13 +83,14 @@
from user_metrics.config import logging, settings
from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
- REQ_NCB_LOCK, REQUEST_PATH
+ REQ_NCB_LOCK, REQUEST_PATH, BROKER_TARGET, umapi_broker_context
from user_metrics.api.engine.data import get_users, get_url_from_keys, \
build_key_signature
from user_metrics.api.engine.request_meta import rebuild_unpacked_request
from user_metrics.metrics.users import MediaWikiUser
from user_metrics.metrics.user_metric import UserMetricError
from user_metrics.utils import unpack_fields
+from user_metrics.api.broker import FileBroker
from multiprocessing import Process, Queue
from collections import namedtuple
@@ -103,7 +104,6 @@
# ###############
# API queues for API service requests and responses
-api_request_queue = Queue()
api_response_queue = Queue()
@@ -121,7 +121,7 @@
job_item_type = namedtuple('JobItem', 'id process request queue')
-def job_control(request_queue, response_queue):
+def job_control(response_queue):
"""
Controls the execution of user metrics requests
@@ -152,19 +152,11 @@
# Request Queue Processing
# ------------------------
- try:
- # Pull an item off of the queue
+ req_item = umapi_broker_context.pop(BROKER_TARGET)
- req_item = request_queue.get(timeout=QUEUE_WAIT)
-
- logging.debug(log_name + ' :: PULLING item from request queue -> '
\
- '\n\tCOHORT = {0} - METRIC = {1}'
- .format(req_item['cohort_expr'], req_item['metric']))
-
- except Exception as e:
- req_item = None
- #logging.debug('{0} :: {1} - Listening ...'
- #.format(__name__, job_control.__name__))
+ logging.debug(log_name + ' :: PULLING item from request queue -> ' \
+ '\n\tCOHORT = {0} - METRIC = {1}'
+ .format(req_item['cohort_expr'], req_item['metric']))
# Process complete jobs
--
To view, visit https://gerrit.wikimedia.org/r/77667
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iddf7547fc10a903c7b2b09419a7f9f949e3b1ff1
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits