Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/77670
Change subject: merge. repair_runtime.
......................................................................
merge. repair_runtime.
Change-Id: I35fc913234fd3c08217f1022f123f4106e7fae4c
---
M user_metrics/api/__init__.py
M user_metrics/api/engine/request_manager.py
2 files changed, 13 insertions(+), 31 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/70/77670/1
diff --git a/user_metrics/api/__init__.py b/user_metrics/api/__init__.py
index 2a91f8e..b31d176 100644
--- a/user_metrics/api/__init__.py
+++ b/user_metrics/api/__init__.py
@@ -10,8 +10,11 @@
from multiprocessing import Lock
from user_metrics.api.broker import FileBroker
-REQUEST_BROKER_TARGET = 'broker.txt'
+REQUEST_BROKER_TARGET = 'request_broker.txt'
+RESPONSE_BROKER_TARGET = 'response_broker.txt'
+
umapi_broker_context = FileBroker()
+
query_mod = nested_import(settings.__query_module__)
# Lock for request notification callback operations
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index c7d5297..3a5f814 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -83,13 +83,13 @@
from user_metrics.config import logging, settings
from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
- REQ_NCB_LOCK, REQUEST_PATH, REQUEST_BROKER_TARGET, umapi_broker_context
+ REQ_NCB_LOCK, REQUEST_PATH, REQUEST_BROKER_TARGET, umapi_broker_context,\
+ RESPONSE_BROKER_TARGET
from user_metrics.api.engine.data import get_users, get_url_from_keys, \
build_key_signature
from user_metrics.api.engine.request_meta import rebuild_unpacked_request
from user_metrics.metrics.users import MediaWikiUser
from user_metrics.metrics.user_metric import UserMetricError
-from user_metrics.utils import unpack_fields
from multiprocessing import Process, Queue
from collections import namedtuple
@@ -101,10 +101,6 @@
# API JOB HANDLER
# ###############
-
-# API queues for API service requests and responses
-api_response_queue = Queue()
-
# MODULE CONSTANTS
#
@@ -120,7 +116,7 @@
job_item_type = namedtuple('JobItem', 'id process request queue')
-def job_control(response_queue):
+def job_control():
"""
Controls the execution of user metrics requests
@@ -157,34 +153,17 @@
'\n\tCOHORT = {0} - METRIC = {1}'
.format(req_item['cohort_expr'], req_item['metric']))
-
# Process complete jobs
# ---------------------
for job_item in job_queue:
- # Look for completed jobs
- if not job_item.queue.empty():
-
- # Put request creds on res queue -- this goes to
- # response_handler asynchronously
- response_queue.put(unpack_fields(job_item.request),
- block=True)
-
- # Pull data off of the queue and add it to response queue
- while not job_item.queue.empty():
- data = job_item.queue.get(True)
- if data:
- response_queue.put(data, block=True)
-
- del job_queue[job_queue.index(job_item)]
-
- concurrent_jobs -= 1
-
- logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}' \
- '\n\tConcurrent jobs = {1}'
- .format(str(job_item.id), concurrent_jobs))
-
+ umapi_broker_context.add(RESPONSE_BROKER_TARGET, job_item.request)
+ del job_queue[job_queue.index(job_item)]
+ concurrent_jobs -= 1
+ logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'\
+ '\n\tConcurrent jobs = {1}'
+ .format(str(job_item.id), concurrent_jobs))
# Process pending jobs
# --------------------
--
To view, visit https://gerrit.wikimedia.org/r/77670
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I35fc913234fd3c08217f1022f123f4106e7fae4c
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits