Rfaulk has uploaded a new change for review.
https://gerrit.wikimedia.org/r/77673
Change subject: mod. response handler to utilize file broker context.
......................................................................
mod. response handler to utilize file broker context.
Change-Id: Ic29888846f7552e8fb74049c5856925230620539
---
M user_metrics/api/engine/response_handler.py
1 file changed, 7 insertions(+), 42 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics
refs/changes/73/77673/1
diff --git a/user_metrics/api/engine/response_handler.py
b/user_metrics/api/engine/response_handler.py
index d7a4738..4b9eda5 100644
--- a/user_metrics/api/engine/response_handler.py
+++ b/user_metrics/api/engine/response_handler.py
@@ -8,12 +8,10 @@
__date__ = "2013-03-14"
__license__ = "GPL (version 2 or later)"
+from user_metrics.api import RESPONSE_BROKER_TARGET, umapi_broker_context
from user_metrics.config import logging
-from user_metrics.api import REQ_NCB_LOCK
from user_metrics.api.engine.request_meta import rebuild_unpacked_request
from user_metrics.api.engine.data import set_data, build_key_signature
-from Queue import Empty
-from flask import escape
# Timeout in seconds to wait for data on the queue. This should be long
# enough to ensure that the full response can be received
@@ -24,54 +22,21 @@
# ####################
-def process_responses(response_queue, msg_in):
+def process_response():
""" Pulls responses off of the queue. """
- log_name = '{0} :: {1}'.format(__name__, process_responses.__name__)
+ log_name = '{0} :: {1}'.format(__name__, process_response.__name__)
logging.debug(log_name + ' - STARTING...')
while 1:
- stream = ''
- # Block on the response queue
- try:
- res = response_queue.get(True)
- request_meta = rebuild_unpacked_request(res)
- except Exception:
- logging.error(log_name + ' - Could not get request meta')
- continue
-
- data = response_queue.get(True)
- while data:
- stream += data
- try:
- data = response_queue.get(True, timeout=1)
- except Empty:
- break
-
- try:
- data = eval(stream)
- except Exception as e:
-
- # Report a fraction of the failed response data directly in the
- # logger
- if len(unicode(stream)) > 2000:
- excerpt = stream[:1000] + ' ... ' + stream[-1000:]
- else:
- excerpt = stream
-
- logging.error(log_name + ' - Request failed. {0}\n\n' \
- 'data excerpt: {1}'.format(e.message,
excerpt))
-
- # Format a response that will report on the failed request
- stream = "OrderedDict([('status', 'Request failed.'), " \
- "('exception', '" + escape(unicode(e.message)) + "')," \
- "('request', '" + escape(unicode(request_meta)) + "'), " \
- "('data', '" + escape(unicode(stream)) + "')])"
-
+ # Read request from the broker target
+ res_item = umapi_broker_context.pop(RESPONSE_BROKER_TARGET)
+ request_meta = rebuild_unpacked_request(res_item)
key_sig = build_key_signature(request_meta, hash_result=True)
# Add result to cache once completed
+ # TODO - umapi_broker_context.add(target, key_sig, res_item)
logging.debug(log_name + ' - Setting data for {0}'.format(
str(request_meta)))
--
To view, visit https://gerrit.wikimedia.org/r/77673
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic29888846f7552e8fb74049c5856925230620539
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits