Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/77676
Change subject: add. timouts between broker item polls.
......................................................................
add. timouts between broker item polls.
Change-Id: Ida8f2cdebb18446df14d9589f68a98e3e7f7561f
---
M user_metrics/api/engine/request_manager.py
M user_metrics/api/engine/response_handler.py
2 files changed, 18 insertions(+), 2 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/76/77676/1
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index f8624ca..ce8da09 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -94,6 +94,7 @@
from collections import namedtuple
from os import getpid
from sys import getsizeof
+import time
# API JOB HANDLER
@@ -107,7 +108,7 @@
MAX_BLOCK_SIZE = 5000
MAX_CONCURRENT_JOBS = 1
QUEUE_WAIT = 5
-
+RESQUEST_TIMEOUT = 1.0
# Defines the job item type used to temporarily store job progress
job_item_type = namedtuple('JobItem', 'id process request queue')
@@ -141,10 +142,15 @@
while 1:
+ time.sleep(RESQUEST_TIMEOUT)
+
# Request Queue Processing
# ------------------------
+ logging.debug(log_name + ' - POLLING REQUESTS...')
req_item = umapi_broker_context.pop(REQUEST_BROKER_TARGET)
+ if not req_item:
+ continue
logging.debug(log_name + ' :: PULLING item from request queue -> ' \
'\n\tCOHORT = {0} - METRIC = {1}'
diff --git a/user_metrics/api/engine/response_handler.py
b/user_metrics/api/engine/response_handler.py
index 4b9eda5..1afe605 100644
--- a/user_metrics/api/engine/response_handler.py
+++ b/user_metrics/api/engine/response_handler.py
@@ -13,9 +13,11 @@
from user_metrics.api.engine.request_meta import rebuild_unpacked_request
from user_metrics.api.engine.data import set_data, build_key_signature
+import time
+
# Timeout in seconds to wait for data on the queue. This should be long
# enough to ensure that the full response can be received
-RESPONSE_TIMEOUT = 0.1
+RESPONSE_TIMEOUT = 1.0
# API RESPONSE HANDLER
@@ -30,8 +32,14 @@
while 1:
+ time.sleep(RESPONSE_TIMEOUT)
+
# Read request from the broker target
+ logging.debug(log_name + ' - POLLING RESPONSES...')
res_item = umapi_broker_context.pop(RESPONSE_BROKER_TARGET)
+ if not res_item:
+ continue
+
request_meta = rebuild_unpacked_request(res_item)
key_sig = build_key_signature(request_meta, hash_result=True)
@@ -42,4 +50,6 @@
str(request_meta)))
set_data(stream, request_meta)
+
+
logging.debug(log_name + ' - SHUTTING DOWN...')
--
To view, visit https://gerrit.wikimedia.org/r/77676
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ida8f2cdebb18446df14d9589f68a98e3e7f7561f
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits