Rfaulk has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/77679


Change subject: mod. Processing request item.
......................................................................

mod. Processing request item.

Change-Id: I53cbb34a16c0d6414cf84be3d47238dbb063aed4
---
M user_metrics/api/engine/request_manager.py
1 file changed, 17 insertions(+), 21 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics 
refs/changes/79/77679/1

diff --git a/user_metrics/api/engine/request_manager.py 
b/user_metrics/api/engine/request_manager.py
index 37231a4..7eeb12c 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -147,7 +147,11 @@
         # ------------------------
 
         logging.debug(log_name  + ' - POLLING REQUESTS...')
-        req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
+        if concurrent_jobs <= MAX_CONCURRENT_JOBS:
+            req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
+        else:
+            continue
+
         if not req_item:
             continue
 
@@ -167,30 +171,22 @@
                                      '\n\tConcurrent jobs = {1}'
             .format(str(job_item.id), concurrent_jobs))
 
-        # Process pending jobs
-        # --------------------
+        # Process request
+        # ---------------
 
-        for wait_req in wait_queue:
-            if concurrent_jobs <= MAX_CONCURRENT_JOBS:
-                # prepare job from item
+        req_q = Queue()
+        proc = Process(target=process_metrics, args=(req_q, req_item))
+        proc.start()
 
-                req_q = Queue()
-                proc = Process(target=process_metrics, args=(req_q, wait_req))
-                proc.start()
+        job_item = job_item_type(job_id, proc, req_item, req_q)
+        job_queue.append(job_item)
 
-                job_item = job_item_type(job_id, proc, wait_req, req_q)
-                job_queue.append(job_item)
+        concurrent_jobs += 1
+        job_id += 1
 
-                del wait_queue[wait_queue.index(wait_req)]
-
-                concurrent_jobs += 1
-                job_id += 1
-
-                logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}' \
-                                         '\n\tConcurrent jobs = {1}, ' \
-                                         'COHORT = {2} - METRIC = {3}'\
-                    .format(str(job_id), concurrent_jobs,
-                            wait_req.cohort_expr, wait_req.metric))
+        logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}' \
+                                 '\n\tConcurrent jobs = {1}, REQ = {2}'
+                      .format(str(job_id), concurrent_jobs, req_item))
 
     logging.debug('{0} - FINISHING.'.format(log_name))
 

-- 
To view, visit https://gerrit.wikimedia.org/r/77679
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I53cbb34a16c0d6414cf84be3d47238dbb063aed4
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to