Rfaulk has submitted this change and it was merged.

Change subject: fix. logic around request processing.
......................................................................


fix. logic around request processing.

Change-Id: I1c891b8df0d21d4df91560c6a4d72ad6c4297790
---
M user_metrics/api/engine/request_manager.py
1 file changed, 48 insertions(+), 50 deletions(-)

Approvals:
  Rfaulk: Verified; Looks good to me, approved



diff --git a/user_metrics/api/engine/request_manager.py 
b/user_metrics/api/engine/request_manager.py
index 4770577..32039cf 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -149,7 +149,11 @@
 
         logging.debug(log_name + ' :: POLLING REQUESTS...')
         logging.debug(log_name + ' :: JOB QUEUE - {0}'.format(str(job_queue)))
-        if concurrent_jobs <= MAX_CONCURRENT_JOBS:
+        req_item = None
+
+        # Only process if there are fewer than the maximum number of concurrent
+        # jobs
+        if concurrent_jobs < MAX_CONCURRENT_JOBS:
 
             # Pop from request target
             req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
@@ -159,73 +163,67 @@
                 url_hash = sha1(req_item.encode('utf-8')).hexdigest()
                 umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash,
                                          req_item)
-            else:
-                continue
-        else:
-            continue
 
-        # Continue if there is no request or the job_queue is empty
-        if not req_item and not concurrent_jobs:
-            continue
-
-        logging.debug(log_name + ' :: PULLING item from request queue -> '
-                                 '\n\t{0}'
-                      .format(req_item))
+            logging.debug(log_name + ' :: PULLING item from request queue -> '
+                                     '\n\t{0}'
+                          .format(req_item))
 
         # Process complete jobs
         # ---------------------
 
-        for job_item in job_queue:
+        if concurrent_jobs:
+            for job_item in job_queue:
 
-            if not job_item.queue.empty():
+                if not job_item.queue.empty():
 
-                logging.info(log_name + ' :: READING RESPONSE - {0}'.
-                    format(job_item.request))
+                    logging.info(log_name + ' :: READING RESPONSE - {0}'.
+                        format(job_item.request))
 
-                # Pull data off of the queue and add it to response queue
-                data = ''
-                while not job_item.queue.empty():
-                    data += job_item.queue.get(True)
+                    # Pull data off of the queue and add it to response queue
+                    data = ''
+                    while not job_item.queue.empty():
+                        data += job_item.queue.get(True)
 
-                # Remove from process target
-                url_hash = sha1(job_item.request.encode('utf-8')).hexdigest()
-                try:
-                    umapi_broker_context.remove(PROCESS_BROKER_TARGET,
-                                                url_hash)
-                except Exception as e:
-                    logging.error(log_name + ' :: Could not process '
-                                             '{0} from {1}  -- {2}'.
-                        format(job_item.request,
-                               PROCESS_BROKER_TARGET,
-                               e.message))
+                    # Remove from process target
+                    url_hash = 
sha1(job_item.request.encode('utf-8')).hexdigest()
+                    try:
+                        umapi_broker_context.remove(PROCESS_BROKER_TARGET,
+                                                    url_hash)
+                    except Exception as e:
+                        logging.error(log_name + ' :: Could not process '
+                                                 '{0} from {1}  -- {2}'.
+                            format(job_item.request,
+                                   PROCESS_BROKER_TARGET,
+                                   e.message))
 
-                # Add to response target
-                umapi_broker_context.add(RESPONSE_BROKER_TARGET, url_hash,
-                                         pack_response_for_broker(
-                                             job_item.request, data))
+                    # Add to response target
+                    umapi_broker_context.add(RESPONSE_BROKER_TARGET, url_hash,
+                                             pack_response_for_broker(
+                                                 job_item.request, data))
 
-                del job_queue[job_queue.index(job_item)]
-                concurrent_jobs -= 1
-                logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'
-                                         '\n\tConcurrent jobs = {1}'
-                              .format(str(job_item.id), concurrent_jobs))
+                    del job_queue[job_queue.index(job_item)]
+                    concurrent_jobs -= 1
+                    logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'
+                                             '\n\tConcurrent jobs = {1}'
+                                  .format(str(job_item.id), concurrent_jobs))
 
         # Process request
         # ---------------
 
-        req_q = Queue()
-        proc = Process(target=process_metrics, args=(req_q, req_item))
-        proc.start()
+        if req_item:
+            req_q = Queue()
+            proc = Process(target=process_metrics, args=(req_q, req_item))
+            proc.start()
 
-        job_item = job_item_type(job_id, proc, req_item, req_q)
-        job_queue.append(job_item)
+            job_item = job_item_type(job_id, proc, req_item, req_q)
+            job_queue.append(job_item)
 
-        concurrent_jobs += 1
-        job_id += 1
+            concurrent_jobs += 1
+            job_id += 1
 
-        logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}'
-                                 '\n\tConcurrent jobs = {1}, REQ = {2}'
-                      .format(str(job_id), concurrent_jobs, req_item))
+            logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}'
+                                     '\n\tConcurrent jobs = {1}, REQ = {2}'
+                          .format(str(job_id), concurrent_jobs, req_item))
 
     logging.debug('{0} - FINISHING.'.format(log_name))
 

-- 
To view, visit https://gerrit.wikimedia.org/r/78614
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I1c891b8df0d21d4df91560c6a4d72ad6c4297790
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
Gerrit-Reviewer: Rfaulk <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to