Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/78614
Change subject: fix. logic around request processing.
......................................................................
fix. logic around request processing.
Change-Id: I1c891b8df0d21d4df91560c6a4d72ad6c4297790
---
M user_metrics/api/engine/request_manager.py
1 file changed, 48 insertions(+), 50 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/14/78614/1
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index 4770577..32039cf 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -149,7 +149,11 @@
logging.debug(log_name + ' :: POLLING REQUESTS...')
logging.debug(log_name + ' :: JOB QUEUE - {0}'.format(str(job_queue)))
- if concurrent_jobs <= MAX_CONCURRENT_JOBS:
+ req_item = None
+
+ # Only process if there are fewer than the maximum number of concurrent
+ # jobs
+ if concurrent_jobs < MAX_CONCURRENT_JOBS:
# Pop from request target
req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
@@ -159,73 +163,67 @@
url_hash = sha1(req_item.encode('utf-8')).hexdigest()
umapi_broker_context.add(PROCESS_BROKER_TARGET, url_hash,
req_item)
- else:
- continue
- else:
- continue
- # Continue if there is no request or the job_queue is empty
- if not req_item and not concurrent_jobs:
- continue
-
- logging.debug(log_name + ' :: PULLING item from request queue -> '
- '\n\t{0}'
- .format(req_item))
+ logging.debug(log_name + ' :: PULLING item from request queue -> '
+ '\n\t{0}'
+ .format(req_item))
# Process complete jobs
# ---------------------
- for job_item in job_queue:
+ if concurrent_jobs:
+ for job_item in job_queue:
- if not job_item.queue.empty():
+ if not job_item.queue.empty():
- logging.info(log_name + ' :: READING RESPONSE - {0}'.
- format(job_item.request))
+ logging.info(log_name + ' :: READING RESPONSE - {0}'.
+ format(job_item.request))
- # Pull data off of the queue and add it to response queue
- data = ''
- while not job_item.queue.empty():
- data += job_item.queue.get(True)
+ # Pull data off of the queue and add it to response queue
+ data = ''
+ while not job_item.queue.empty():
+ data += job_item.queue.get(True)
- # Remove from process target
- url_hash = sha1(job_item.request.encode('utf-8')).hexdigest()
- try:
- umapi_broker_context.remove(PROCESS_BROKER_TARGET,
- url_hash)
- except Exception as e:
- logging.error(log_name + ' :: Could not process '
- '{0} from {1} -- {2}'.
- format(job_item.request,
- PROCESS_BROKER_TARGET,
- e.message))
+ # Remove from process target
+ url_hash =
sha1(job_item.request.encode('utf-8')).hexdigest()
+ try:
+ umapi_broker_context.remove(PROCESS_BROKER_TARGET,
+ url_hash)
+ except Exception as e:
+ logging.error(log_name + ' :: Could not process '
+ '{0} from {1} -- {2}'.
+ format(job_item.request,
+ PROCESS_BROKER_TARGET,
+ e.message))
- # Add to response target
- umapi_broker_context.add(RESPONSE_BROKER_TARGET, url_hash,
- pack_response_for_broker(
- job_item.request, data))
+ # Add to response target
+ umapi_broker_context.add(RESPONSE_BROKER_TARGET, url_hash,
+ pack_response_for_broker(
+ job_item.request, data))
- del job_queue[job_queue.index(job_item)]
- concurrent_jobs -= 1
- logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'
- '\n\tConcurrent jobs = {1}'
- .format(str(job_item.id), concurrent_jobs))
+ del job_queue[job_queue.index(job_item)]
+ concurrent_jobs -= 1
+ logging.debug(log_name + ' :: RUN -> RESPONSE - Job ID {0}'
+ '\n\tConcurrent jobs = {1}'
+ .format(str(job_item.id), concurrent_jobs))
# Process request
# ---------------
- req_q = Queue()
- proc = Process(target=process_metrics, args=(req_q, req_item))
- proc.start()
+ if req_item:
+ req_q = Queue()
+ proc = Process(target=process_metrics, args=(req_q, req_item))
+ proc.start()
- job_item = job_item_type(job_id, proc, req_item, req_q)
- job_queue.append(job_item)
+ job_item = job_item_type(job_id, proc, req_item, req_q)
+ job_queue.append(job_item)
- concurrent_jobs += 1
- job_id += 1
+ concurrent_jobs += 1
+ job_id += 1
- logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}'
- '\n\tConcurrent jobs = {1}, REQ = {2}'
- .format(str(job_id), concurrent_jobs, req_item))
+ logging.debug(log_name + ' :: WAIT -> RUN - Job ID {0}'
+ '\n\tConcurrent jobs = {1}, REQ = {2}'
+ .format(str(job_id), concurrent_jobs, req_item))
logging.debug('{0} - FINISHING.'.format(log_name))
--
To view, visit https://gerrit.wikimedia.org/r/78614
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I1c891b8df0d21d4df91560c6a4d72ad6c4297790
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits