Rfaulk has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/78476


Change subject: mod. condition on which jobs are processed in main loop / pep 8 
/ logging.
......................................................................

mod. condition on which jobs are processed in main loop / pep 8 / logging.

Change-Id: Ic3fe864236e9bdaf071239d4244cb0017aae0d1e
---
M user_metrics/api/engine/request_manager.py
1 file changed, 16 insertions(+), 10 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics 
refs/changes/76/78476/1

diff --git a/user_metrics/api/engine/request_manager.py 
b/user_metrics/api/engine/request_manager.py
index 13277f0..4770577 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -87,8 +87,7 @@
     RESPONSE_BROKER_TARGET, PROCESS_BROKER_TARGET
 from user_metrics.api.engine import pack_response_for_broker
 from user_metrics.api.engine.data import get_users
-from user_metrics.api.engine.request_meta import build_request_obj, \
-    parse_raw_request
+from user_metrics.api.engine.request_meta import build_request_obj
 from user_metrics.metrics.users import MediaWikiUser
 from user_metrics.metrics.user_metric import UserMetricError
 
@@ -148,7 +147,8 @@
         # Request Queue Processing
         # ------------------------
 
-        logging.debug(log_name + ' - POLLING REQUESTS...')
+        logging.debug(log_name + ' :: POLLING REQUESTS...')
+        logging.debug(log_name + ' :: JOB QUEUE - {0}'.format(str(job_queue)))
         if concurrent_jobs <= MAX_CONCURRENT_JOBS:
 
             # Pop from request target
@@ -157,13 +157,15 @@
             # Push to process target
             if req_item:
                 url_hash = sha1(req_item.encode('utf-8')).hexdigest()
-                umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash, 
req_item)
+                umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash,
+                                         req_item)
             else:
                 continue
         else:
             continue
 
-        if not req_item:
+        # Continue if there is no request or the job_queue is empty
+        if not req_item and not concurrent_jobs:
             continue
 
         logging.debug(log_name + ' :: PULLING item from request queue -> '
@@ -177,6 +179,9 @@
 
             if not job_item.queue.empty():
 
+                logging.info(log_name + ' :: READING RESPONSE - {0}'.
+                    format(job_item.request))
+
                 # Pull data off of the queue and add it to response queue
                 data = ''
                 while not job_item.queue.empty():
@@ -185,9 +190,10 @@
                 # Remove from process target
                 url_hash = sha1(job_item.request.encode('utf-8')).hexdigest()
                 try:
-                    umapi_broker_context.remove(PROCESS_BROKER_TARGET, 
url_hash)
+                    umapi_broker_context.remove(PROCESS_BROKER_TARGET,
+                                                url_hash)
                 except Exception as e:
-                    logging.error(__name__ + ' :: Could not process '
+                    logging.error(log_name + ' :: Could not process '
                                              '{0} from {1}  -- {2}'.
                         format(job_item.request,
                                PROCESS_BROKER_TARGET,
@@ -236,7 +242,7 @@
 
     log_name = '{0} :: {1}'.format(__name__, process_metrics.__name__)
 
-    logging.info(log_name + ' - START JOB'
+    logging.info(log_name + ' :: START JOB'
                             '\n\t{0} -  PID = {1})'.
                  format(request_url, getpid()))
 
@@ -305,14 +311,14 @@
         else:
             p.put(results, block=True)
 
-        logging.info(log_name + ' - END JOB'
+        logging.info(log_name + ' :: END JOB'
                                 '\n\tCOHORT = {0}- METRIC = {1} -  PID = {2})'.
                      format(request_obj.cohort_expr, request_obj.metric,
                             getpid()))
 
     else:
         p.put(err_msg, block=True)
-        logging.info(log_name + ' - END JOB - FAILED.'
+        logging.info(log_name + ' :: END JOB - FAILED.'
                                 '\n\tCOHORT = {0}- METRIC = {1} -  PID = {2})'.
                      format(request_obj.cohort_expr, request_obj.metric,
                             getpid()))

-- 
To view, visit https://gerrit.wikimedia.org/r/78476
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic3fe864236e9bdaf071239d4244cb0017aae0d1e
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to