Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/73560
Change subject: mod. replace response queue with a FileBroker target..
......................................................................
mod. replace response queue with a FileBroker target..
Change-Id: I2e26e06d030e2b4d060cd073253d5c4dc8c942ef
---
M user_metrics/api/__init__.py
M user_metrics/api/engine/request_manager.py
M user_metrics/api/run_handlers.py
3 files changed, 18 insertions(+), 39 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/60/73560/1
diff --git a/user_metrics/api/__init__.py b/user_metrics/api/__init__.py
index 2a91f8e..b31d176 100644
--- a/user_metrics/api/__init__.py
+++ b/user_metrics/api/__init__.py
@@ -10,8 +10,11 @@
from multiprocessing import Lock
from user_metrics.api.broker import FileBroker
-REQUEST_BROKER_TARGET = 'broker.txt'
+REQUEST_BROKER_TARGET = 'request_broker.txt'
+RESPONSE_BROKER_TARGET = 'response_broker.txt'
+
umapi_broker_context = FileBroker()
+
query_mod = nested_import(settings.__query_module__)
# Lock for request notification callback operations
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index c7d5297..3a5f814 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -83,13 +83,13 @@
from user_metrics.config import logging, settings
from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
- REQ_NCB_LOCK, REQUEST_PATH, REQUEST_BROKER_TARGET, umapi_broker_context
+ REQ_NCB_LOCK, REQUEST_PATH, REQUEST_BROKER_TARGET, umapi_broker_context,\
+ RESPONSE_BROKER_TARGET
from user_metrics.api.engine.data import get_users, get_url_from_keys, \
build_key_signature
from user_metrics.api.engine.request_meta import rebuild_unpacked_request
from user_metrics.metrics.users import MediaWikiUser
from user_metrics.metrics.user_metric import UserMetricError
-from user_metrics.utils import unpack_fields
from multiprocessing import Process, Queue
from collections import namedtuple
@@ -101,10 +101,6 @@
# API JOB HANDLER
# ###############
-
-# API queues for API service requests and responses
-api_response_queue = Queue()
-
# MODULE CONSTANTS
#
@@ -120,7 +116,7 @@
job_item_type = namedtuple('JobItem', 'id process request queue')
-def job_control(response_queue):
+def job_control():
"""
Controls the execution of user metrics requests
@@ -157,34 +153,17 @@
'\n\tCOHORT = {0} - METRIC = {1}'
.format(req_item['cohort_expr'], req_item['metric']))
-
# Process complete jobs
# ---------------------
for job_item in job_queue:
- # Look for completed jobs
- if not job_item.queue.empty():
-
- # Put request creds on res queue -- this goes to
- # response_handler asynchronously
- response_queue.put(unpack_fields(job_item.request),
- block=True)
-
- # Pull data off of the queue and add it to response queue
- while not job_item.queue.empty():
- data = job_item.queue.get(True)
- if data:
- response_queue.put(data, block=True)
-
- del job_queue[job_queue.index(job_item)]
-
- concurrent_jobs -= 1
-
- logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}' \
- '\n\tConcurrent jobs = {1}'
- .format(str(job_item.id), concurrent_jobs))
-
+ umapi_broker_context.add(RESPONSE_BROKER_TARGET, job_item.request)
+ del job_queue[job_queue.index(job_item)]
+ concurrent_jobs -= 1
+ logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'\
+ '\n\tConcurrent jobs = {1}'
+ .format(str(job_item.id), concurrent_jobs))
# Process pending jobs
# --------------------
diff --git a/user_metrics/api/run_handlers.py b/user_metrics/api/run_handlers.py
index 0bc9c66..741c220 100644
--- a/user_metrics/api/run_handlers.py
+++ b/user_metrics/api/run_handlers.py
@@ -23,7 +23,7 @@
import multiprocessing as mp
from user_metrics.api.engine.request_manager import \
- req_notification_queue_out, req_notification_queue_in, api_response_queue
+ req_notification_queue_out, req_notification_queue_in
from user_metrics.api.engine.response_handler import process_responses
from user_metrics.api.engine.request_manager import job_control, \
requests_notification_callback
@@ -35,15 +35,13 @@
rm_callback_proc = None
-def setup_controller(res_queue, msg_queue_in, msg_queue_out):
+def setup_controller(msg_queue_in, msg_queue_out):
"""
Sets up the process that handles API jobs
"""
- job_controller_proc = mp.Process(target=job_control,
- args=res_queue)
+ job_controller_proc = mp.Process(target=job_control)
response_controller_proc = mp.Process(target=process_responses,
- args=(res_queue,
- msg_queue_in))
+ args=msg_queue_in)
rm_callback_proc = mp.Process(target=requests_notification_callback,
args=(msg_queue_in,
msg_queue_out))
@@ -67,6 +65,5 @@
if __name__ == '__main__':
- setup_controller(api_response_queue,
- req_notification_queue_in,
+ setup_controller(req_notification_queue_in,
req_notification_queue_out)
--
To view, visit https://gerrit.wikimedia.org/r/73560
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I2e26e06d030e2b4d060cd073253d5c4dc8c942ef
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: repair_runtime
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits