Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/77671
Change subject: rm. Request notification handler to maintain job queue.
......................................................................
rm. Request notification handler to maintain job queue.
Change-Id: I7b0764d42ad85afaa98533ae0ff71a1c5c9508fe
---
M user_metrics/api/engine/request_manager.py
M user_metrics/api/engine/response_handler.py
2 files changed, 1 insertion(+), 173 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/71/77671/1
diff --git a/user_metrics/api/engine/request_manager.py
b/user_metrics/api/engine/request_manager.py
index 3a5f814..dd33191 100644
--- a/user_metrics/api/engine/request_manager.py
+++ b/user_metrics/api/engine/request_manager.py
@@ -204,12 +204,6 @@
.format(rm.cohort_expr, rm.metric))
wait_queue.append(rm)
- # Communicate with request notification callback about new job
- key_sig = build_key_signature(rm, hash_result=True)
- url = get_url_from_keys(build_key_signature(rm), REQUEST_PATH)
- req_cb_add_req(key_sig, url, REQ_NCB_LOCK)
-
-
logging.debug('{0} - FINISHING.'.format(log_name))
@@ -476,166 +470,3 @@
results['data'][m[0]] = m[1:]
return results
-
-
-# REQUEST NOTIFICATIONS
-# #####################
-
-from collections import OrderedDict
-
-req_notification_queue_in = Queue()
-req_notification_queue_out = Queue()
-
-request_msg_type = namedtuple('RequestMessage', 'type hash url is_alive')
-
-
-def requests_notification_callback(msg_queue_in, msg_queue_out):
- """
- Asynchronous callback. Tracks status of requests and new requests.
- This callback utilizes ``msg_queue_in`` & ``msg_queue_out`` to
- manage request status.
- """
- log_name = '{0} :: {1}'.format(__name__,
- requests_notification_callback.__name__)
- logging.debug('{0} - STARTING...'.format(log_name))
-
- # TODO - potentially extend with an in-memory cache
- job_list = OrderedDict()
- while 1:
-
- try:
- msg = msg_queue_in.get(True)
- except IOError as e:
- logging.error(__name__ + ' :: Could not block '
- 'on in queue: "{0}"'.format(e.message))
- sleep(1)
- continue
-
- try:
- type = msg[0]
- except (KeyError, ValueError):
- logging.error(log_name + ' - No valid type ' \
- '{0}'.format(str(msg)))
- continue
-
- # Init request
- if type == 0:
- try:
- job_list[msg[1]] = [True, msg[2]]
- logging.debug(log_name + ' - Initialize Request: ' \
- '{0}.'.format(str(msg)))
- except Exception:
- logging.error(log_name + ' - Initialize Request' \
- ' failed: {0}'.format(str(msg)))
-
- # Flag request complete - leave on queue
- elif type == 1:
- try:
- job_list[msg[1]][0] = False
- logging.debug(log_name + ' - Set request finished: ' \
- '{0}.\n'.format(str(msg)))
- except Exception:
- logging.error(log_name + ' - Set request finished failed: ' \
- '{0}\n'.format(str(msg)))
-
- # Is the key in the cache and running?
- elif type == 2:
- try:
- if msg[1] in job_list:
- msg_queue_out.put([job_list[msg[1]][0]], True)
- else:
- msg_queue_out.put([False], True)
- logging.debug(log_name + ' - Get request alive: ' \
- '{0}.'.format(str(msg)))
- except (KeyError, ValueError):
- logging.error(log_name + ' - Get request alive failed: ' \
- '{0}'.format(str(msg)))
-
- # Get keys
- elif type == 3:
- msg_queue_out.put(job_list.keys(), True)
-
- # Get url
- elif type == 4:
- try:
- if msg[1] in job_list:
- msg_queue_out.put([job_list[msg[1]][1]], True)
- else:
- logging.error(log_name + ' - Get URL failed: {0}'.
- format(str(msg)))
- except (KeyError, ValueError):
- logging.error(log_name + ' - Get URL failed:
{0}'.format(str(msg)))
- else:
- logging.error(log_name + ' - Bad message: {0}'.format(str(msg)))
-
- logging.debug('{0} - SHUTTING DOWN...'.format(log_name))
-
-
-# Wrapper Methods for working with Request Notifications
-# Use locks to enforce atomicity
-
-BLOCK_TIMEOUT = 1
-
-
-def req_cb_get_url(key, lock):
- lock.acquire()
- req_notification_queue_in.put([4, key], block=True)
- try:
- val = req_notification_queue_out.get(True, timeout=BLOCK_TIMEOUT)[0]
- except Empty:
- logging.error(__name__ + ' :: req_cb_get_url -'
- ' Block time expired.')
- val = ''
- finally:
- lock.release()
- return val
-
-
-def req_cb_get_cache_keys(lock):
- lock.acquire()
- req_notification_queue_in.put([3], block=True)
- try:
- val = req_notification_queue_out.get(block=True,
- timeout=BLOCK_TIMEOUT)
- except Empty:
- logging.error(__name__ + ' :: req_cb_get_cache_keys -'
- ' Block time expired.')
- val = []
- finally:
- lock.release()
- return val
-
-
-def req_cb_get_is_running(key, lock):
- lock.acquire()
- req_notification_queue_in.put([2, key], True)
- try:
- val = req_notification_queue_out.get(block=True,
- timeout=BLOCK_TIMEOUT)[0]
- except Empty:
- logging.error(__name__ + ' :: req_cb_get_is_running -'
- ' Block time expired.')
- val = False
- finally:
- lock.release()
- return val
-
-
-def req_cb_add_req(key, url, lock):
- lock.acquire()
- try:
- req_notification_queue_in.put([0, key, url])
- except Empty:
- pass
- finally:
- lock.release()
-
-
-def req_cb_flag_job_complete(key, lock):
- lock.acquire()
- try:
- req_notification_queue_in.put([1, key], True)
- except Empty:
- pass
- finally:
- lock.release()
diff --git a/user_metrics/api/engine/response_handler.py
b/user_metrics/api/engine/response_handler.py
index 5406d7e..d7a4738 100644
--- a/user_metrics/api/engine/response_handler.py
+++ b/user_metrics/api/engine/response_handler.py
@@ -8,12 +8,10 @@
__date__ = "2013-03-14"
__license__ = "GPL (version 2 or later)"
-from collections import OrderedDict
from user_metrics.config import logging
from user_metrics.api import REQ_NCB_LOCK
from user_metrics.api.engine.request_meta import rebuild_unpacked_request
from user_metrics.api.engine.data import set_data, build_key_signature
-from user_metrics.api.engine.request_manager import req_cb_flag_job_complete
from Queue import Empty
from flask import escape
@@ -73,8 +71,7 @@
key_sig = build_key_signature(request_meta, hash_result=True)
- # Set request in list to "not alive"
- req_cb_flag_job_complete(key_sig, REQ_NCB_LOCK)
+ # Add result to cache once completed
logging.debug(log_name + ' - Setting data for {0}'.format(
str(request_meta)))
--
To view, visit https://gerrit.wikimedia.org/r/77671
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7b0764d42ad85afaa98533ae0ff71a1c5c9508fe
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits