Rfaulk has submitted this change and it was merged.
Change subject: merge. repair_runtime branch.
......................................................................
merge. repair_runtime branch.
Change-Id: Id27c627db0b22dda687754198bca623c378a9daa
---
A user_metrics/api/broker.py
A user_metrics/api/run_handlers.py
2 files changed, 218 insertions(+), 0 deletions(-)
Approvals:
Rfaulk: Verified; Looks good to me, approved
diff --git a/user_metrics/api/broker.py b/user_metrics/api/broker.py
new file mode 100644
index 0000000..eb30d39
--- /dev/null
+++ b/user_metrics/api/broker.py
@@ -0,0 +1,161 @@
+"""
+This module defines the interface between API modules.
+"""
+
+
+__author__ = {
+ "ryan faulkner": "[email protected]"
+}
+__date__ = "2013-07-06"
+__license__ = "GPL (version 2 or later)"
+
+
+import json
+import os
+from user_metrics.config import logging
+
+
+class Broker(object):
+ """
+ Base class for broker
+ """
+
+ def compose(self):
+ raise NotImplementedError()
+
+ def add(self, target, key, value):
+ """
+ Add a key/value pair to the broker
+ """
+ raise NotImplementedError()
+
+ def remove(self, target, key):
+ """
+ Remove a key/value pair to the broker
+ """
+ raise NotImplementedError()
+
+ def update(self, target, key, value):
+ """
+ Remove a key/value pair to the broker
+ """
+ raise NotImplementedError()
+
+ def get(self, target, key):
+ """
+ Retrieve a key/value pair to the broker
+ """
+ raise NotImplementedError()
+
+ def pop(self, target):
+ """
+ Pop the first item off of the queue
+ """
+ raise NotImplementedError()
+
+
+class FileBroker(Broker):
+ """
+ Implements a broker that uses a flat file as a broker
+ """
+
+ def __init__(self, **kwargs):
+ super(FileBroker, self).__init__(**kwargs)
+
+ def compose(self):
+ pass
+
+ def add(self, target, key, value):
+ """
+ Adds key/value pair
+ """
+ if os.path.isfile(target):
+ mode = 'a'
+ else:
+ mode = 'w'
+
+ with open(target, mode) as f:
+ f.write(json.dumps({key: value}) + '\n')
+
+ def remove(self, target, key):
+ """
+ Remove element with the given key
+ """
+ try:
+ with open(target, 'r') as f:
+ lines = f.read().split('\n')
+ for idx, line in enumerate(lines):
+ item = json.loads(line)
+ if item.keys()[0] == key:
+ del lines[idx]
+ break
+ except IOError:
+ lines = []
+ with open(target, 'w'):
+ pass
+
+ with open(target, 'w') as f:
+ for line in lines:
+ f.write(line)
+
+ def update(self, target, key, value):
+ """
+ Update element with the given key
+ """
+ try:
+ with open(target, 'r') as f:
+ lines = f.read().split('\n')
+ for idx, line in enumerate(lines):
+ item = json.loads(line)
+ if item.keys()[0] == key:
+ lines[idx] = json.dumps({key: value}) + '\n'
+ break
+ except IOError:
+ lines = []
+ with open(target, 'w'):
+ pass
+
+ with open(target, 'w') as f:
+ for line in lines:
+ f.write(line)
+
+ def get(self, target, key):
+ """
+ Retrieve a value with the given key
+ """
+ try:
+ with open(target, 'r') as f:
+ lines = f.read().split('\n')
+ for idx, line in enumerate(lines):
+ item = json.loads(line)
+ if item.keys()[0] == key:
+ return item[key]
+ except IOError:
+ with open(target, 'w'):
+ pass
+
+ return None
+
+ def pop(self, target):
+ """
+ Pop the top value from the list
+ """
+ try:
+ with open(target, 'r') as f:
+ contents = f.read()
+ if contents:
+ lines = contents.split('\n')
+ if len(lines):
+ try:
+ item = json.loads(lines[0])
+ key = item.keys()[0]
+ except (KeyError, ValueError):
+ logging.error(__name__ + ' :: FileBroker.pop - '
+ 'Could not parse key.')
+ return None
+ self.remove(target, key)
+ return item[key]
+ except IOError:
+ with open(target, 'w'):
+ pass
+ return None
diff --git a/user_metrics/api/run_handlers.py b/user_metrics/api/run_handlers.py
new file mode 100644
index 0000000..7beba89
--- /dev/null
+++ b/user_metrics/api/run_handlers.py
@@ -0,0 +1,57 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+"""
+This module handle the exescution of the job handler components of the User
+metrics API. This piece consists of three components:
+
+ Job Controller:
+ Schedules incoming requests for execution.
+
+ Response Handler:
+ Synthesizes responses from completed jobs.
+
+ Request Notification Callback:
+ Handles sending notifications on job status.
+"""
+
+__author__ = {
+ "ryan faulkner": "[email protected]"
+}
+__date__ = "2013-07-05"
+__license__ = "GPL (version 2 or later)"
+
+import multiprocessing as mp
+from user_metrics.api.engine.response_handler import process_response
+from user_metrics.api.engine.request_manager import job_control
+from user_metrics.utils import terminate_process_with_checks
+from user_metrics.config import logging
+
+job_controller_proc = None
+response_controller_proc = None
+
+
+def setup_controller():
+ """
+ Sets up the process that handles API jobs
+ """
+ job_controller_proc = mp.Process(target=job_control)
+ response_controller_proc = mp.Process(target=process_response)
+ job_controller_proc.start()
+ response_controller_proc.start()
+
+
+def teardown():
+ """ When the instance is deleted store the pickled data and shutdown
+ the job controller """
+
+ # Shutdown API handlers gracefully
+ try:
+ terminate_process_with_checks(job_controller_proc)
+ terminate_process_with_checks(response_controller_proc)
+ except Exception:
+ logging.error(__name__ + ' :: Could not shut down callbacks.')
+
+
+if __name__ == '__main__':
+ setup_controller()
--
To view, visit https://gerrit.wikimedia.org/r/77663
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: merged
Gerrit-Change-Id: Id27c627db0b22dda687754198bca623c378a9daa
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>
Gerrit-Reviewer: Rfaulk <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits