Rfaulk has submitted this change and it was merged.

Change subject: mod. replace response queue with a FileBroker target..
......................................................................


mod. replace response queue with a FileBroker target..

Change-Id: I2e26e06d030e2b4d060cd073253d5c4dc8c942ef
---
M user_metrics/api/__init__.py
M user_metrics/api/engine/request_manager.py
M user_metrics/api/run_handlers.py
3 files changed, 18 insertions(+), 39 deletions(-)

Approvals:
  Rfaulk: Verified; Looks good to me, approved



diff --git a/user_metrics/api/__init__.py b/user_metrics/api/__init__.py
index 2a91f8e..b31d176 100644
--- a/user_metrics/api/__init__.py
+++ b/user_metrics/api/__init__.py
@@ -10,8 +10,11 @@
 from multiprocessing import Lock
 from user_metrics.api.broker import FileBroker
 
-REQUEST_BROKER_TARGET = 'broker.txt'
+REQUEST_BROKER_TARGET = 'request_broker.txt'
+RESPONSE_BROKER_TARGET = 'response_broker.txt'
+
 umapi_broker_context = FileBroker()
+
 query_mod = nested_import(settings.__query_module__)
 
 # Lock for request notification callback operations
diff --git a/user_metrics/api/engine/request_manager.py 
b/user_metrics/api/engine/request_manager.py
index c7d5297..3a5f814 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -83,13 +83,13 @@
 
 from user_metrics.config import logging, settings
 from user_metrics.api import MetricsAPIError, error_codes, query_mod, \
-    REQ_NCB_LOCK, REQUEST_PATH, REQUEST_BROKER_TARGET, umapi_broker_context
+    REQ_NCB_LOCK, REQUEST_PATH, REQUEST_BROKER_TARGET, umapi_broker_context,\
+    RESPONSE_BROKER_TARGET
 from user_metrics.api.engine.data import get_users, get_url_from_keys, \
     build_key_signature
 from user_metrics.api.engine.request_meta import rebuild_unpacked_request
 from user_metrics.metrics.users import MediaWikiUser
 from user_metrics.metrics.user_metric import UserMetricError
-from user_metrics.utils import unpack_fields
 
 from multiprocessing import Process, Queue
 from collections import namedtuple
@@ -101,10 +101,6 @@
 
 # API JOB HANDLER
 # ###############
-
-# API queues for API service requests and responses
-api_response_queue = Queue()
-
 
 # MODULE CONSTANTS
 #
@@ -120,7 +116,7 @@
 job_item_type = namedtuple('JobItem', 'id process request queue')
 
 
-def job_control(response_queue):
+def job_control():
     """
         Controls the execution of user metrics requests
 
@@ -157,34 +153,17 @@
                                  '\n\tCOHORT = {0} - METRIC = {1}'
             .format(req_item['cohort_expr'], req_item['metric']))
 
-
         # Process complete jobs
         # ---------------------
 
         for job_item in job_queue:
 
-            # Look for completed jobs
-            if not job_item.queue.empty():
-
-                # Put request creds on res queue -- this goes to
-                # response_handler asynchronously
-                response_queue.put(unpack_fields(job_item.request),
-                                   block=True)
-
-                # Pull data off of the queue and add it to response queue
-                while not job_item.queue.empty():
-                    data = job_item.queue.get(True)
-                    if data:
-                        response_queue.put(data, block=True)
-
-                del job_queue[job_queue.index(job_item)]
-
-                concurrent_jobs -= 1
-
-                logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}' \
-                                         '\n\tConcurrent jobs = {1}'
-                    .format(str(job_item.id), concurrent_jobs))
-
+            umapi_broker_context.add(RESPONSE_BROKER_TARGET, job_item.request)
+            del job_queue[job_queue.index(job_item)]
+            concurrent_jobs -= 1
+            logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'\
+                                     '\n\tConcurrent jobs = {1}'
+            .format(str(job_item.id), concurrent_jobs))
 
         # Process pending jobs
         # --------------------
diff --git a/user_metrics/api/run_handlers.py b/user_metrics/api/run_handlers.py
index 0bc9c66..741c220 100644
--- a/user_metrics/api/run_handlers.py
+++ b/user_metrics/api/run_handlers.py
@@ -23,7 +23,7 @@
 
 import multiprocessing as mp
 from user_metrics.api.engine.request_manager import \
-    req_notification_queue_out, req_notification_queue_in, api_response_queue
+    req_notification_queue_out, req_notification_queue_in
 from user_metrics.api.engine.response_handler import process_responses
 from user_metrics.api.engine.request_manager import job_control, \
     requests_notification_callback
@@ -35,15 +35,13 @@
 rm_callback_proc = None
 
 
-def setup_controller(res_queue, msg_queue_in, msg_queue_out):
+def setup_controller(msg_queue_in, msg_queue_out):
     """
         Sets up the process that handles API jobs
     """
-    job_controller_proc = mp.Process(target=job_control,
-                                     args=res_queue)
+    job_controller_proc = mp.Process(target=job_control)
     response_controller_proc = mp.Process(target=process_responses,
-                                          args=(res_queue,
-                                                msg_queue_in))
+                                          args=msg_queue_in)
     rm_callback_proc = mp.Process(target=requests_notification_callback,
                                   args=(msg_queue_in,
                                         msg_queue_out))
@@ -67,6 +65,5 @@
 
 
 if __name__ == '__main__':
-    setup_controller(api_response_queue,
-        req_notification_queue_in,
+    setup_controller(req_notification_queue_in,
         req_notification_queue_out)

-- 
To view, visit https://gerrit.wikimedia.org/r/73560
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I2e26e06d030e2b4d060cd073253d5c4dc8c942ef
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: repair_runtime
Gerrit-Owner: Rfaulk <[email protected]>
Gerrit-Reviewer: Rfaulk <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to