Rfaulk has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/77663


Change subject: merge. repair_runtime branch.
......................................................................

merge. repair_runtime branch.

Change-Id: Id27c627db0b22dda687754198bca623c378a9daa
---
A user_metrics/api/broker.py
A user_metrics/api/run_handlers.py
2 files changed, 218 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/user-metrics 
refs/changes/63/77663/1

diff --git a/user_metrics/api/broker.py b/user_metrics/api/broker.py
new file mode 100644
index 0000000..eb30d39
--- /dev/null
+++ b/user_metrics/api/broker.py
@@ -0,0 +1,161 @@
+"""
+This module defines the interface between API modules.
+"""
+
+
+__author__ = {
+    "ryan faulkner": "[email protected]"
+}
+__date__ = "2013-07-06"
+__license__ = "GPL (version 2 or later)"
+
+
+import json
+import os
+from user_metrics.config import logging
+
+
+class Broker(object):
+    """
+    Base class for broker
+    """
+
+    def compose(self):
+        raise NotImplementedError()
+
+    def add(self, target, key, value):
+        """
+        Add a key/value pair to the broker
+        """
+        raise NotImplementedError()
+
+    def remove(self, target, key):
+        """
+        Remove a key/value pair to the broker
+        """
+        raise NotImplementedError()
+
+    def update(self, target, key, value):
+        """
+        Remove a key/value pair to the broker
+        """
+        raise NotImplementedError()
+
+    def get(self, target, key):
+        """
+        Retrieve a key/value pair to the broker
+        """
+        raise NotImplementedError()
+
+    def pop(self, target):
+        """
+        Pop the first item off of the queue
+        """
+        raise NotImplementedError()
+
+
+class FileBroker(Broker):
+    """
+    Implements a broker that uses a flat file as a broker
+    """
+
+    def __init__(self, **kwargs):
+        super(FileBroker, self).__init__(**kwargs)
+
+    def compose(self):
+        pass
+
+    def add(self, target, key, value):
+        """
+        Adds key/value pair
+        """
+        if os.path.isfile(target):
+            mode = 'a'
+        else:
+            mode = 'w'
+
+        with open(target, mode) as f:
+            f.write(json.dumps({key: value}) + '\n')
+
+    def remove(self, target, key):
+        """
+        Remove element with the given key
+        """
+        try:
+            with open(target, 'r') as f:
+                lines = f.read().split('\n')
+                for idx, line in enumerate(lines):
+                    item = json.loads(line)
+                    if item.keys()[0] == key:
+                        del lines[idx]
+                        break
+        except IOError:
+            lines = []
+            with open(target, 'w'):
+                pass
+
+        with open(target, 'w') as f:
+            for line in lines:
+                f.write(line)
+
+    def update(self, target, key, value):
+        """
+        Update element with the given key
+        """
+        try:
+            with open(target, 'r') as f:
+                lines = f.read().split('\n')
+                for idx, line in enumerate(lines):
+                    item = json.loads(line)
+                    if item.keys()[0] == key:
+                        lines[idx] = json.dumps({key: value}) + '\n'
+                        break
+        except IOError:
+            lines = []
+            with open(target, 'w'):
+                pass
+
+        with open(target, 'w') as f:
+            for line in lines:
+                f.write(line)
+
+    def get(self, target, key):
+        """
+        Retrieve a value with the given key
+        """
+        try:
+            with open(target, 'r') as f:
+                lines = f.read().split('\n')
+                for idx, line in enumerate(lines):
+                    item = json.loads(line)
+                    if item.keys()[0] == key:
+                        return item[key]
+        except IOError:
+            with open(target, 'w'):
+                pass
+
+            return None
+
+    def pop(self, target):
+        """
+        Pop the top value from the list
+        """
+        try:
+            with open(target, 'r') as f:
+                contents = f.read()
+                if contents:
+                    lines = contents.split('\n')
+                    if len(lines):
+                        try:
+                            item = json.loads(lines[0])
+                            key = item.keys()[0]
+                        except (KeyError, ValueError):
+                            logging.error(__name__ + ' :: FileBroker.pop - '
+                                                     'Could not parse key.')
+                            return None
+                        self.remove(target, key)
+                        return item[key]
+        except IOError:
+            with open(target, 'w'):
+                pass
+        return None
diff --git a/user_metrics/api/run_handlers.py b/user_metrics/api/run_handlers.py
new file mode 100644
index 0000000..7beba89
--- /dev/null
+++ b/user_metrics/api/run_handlers.py
@@ -0,0 +1,57 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+
+"""
+This module handle the exescution of the job handler components of the User
+metrics API.  This piece consists of three components:
+
+    Job Controller:
+        Schedules incoming requests for execution.
+
+    Response Handler:
+        Synthesizes responses from completed jobs.
+
+    Request Notification Callback:
+        Handles sending notifications on job status.
+"""
+
+__author__ = {
+    "ryan faulkner": "[email protected]"
+}
+__date__ = "2013-07-05"
+__license__ = "GPL (version 2 or later)"
+
+import multiprocessing as mp
+from user_metrics.api.engine.response_handler import process_response
+from user_metrics.api.engine.request_manager import job_control
+from user_metrics.utils import terminate_process_with_checks
+from user_metrics.config import logging
+
+job_controller_proc = None
+response_controller_proc = None
+
+
+def setup_controller():
+    """
+        Sets up the process that handles API jobs
+    """
+    job_controller_proc = mp.Process(target=job_control)
+    response_controller_proc = mp.Process(target=process_response)
+    job_controller_proc.start()
+    response_controller_proc.start()
+
+
+def teardown():
+    """ When the instance is deleted store the pickled data and shutdown
+        the job controller """
+
+    # Shutdown API handlers gracefully
+    try:
+        terminate_process_with_checks(job_controller_proc)
+        terminate_process_with_checks(response_controller_proc)
+    except Exception:
+        logging.error(__name__ + ' :: Could not shut down callbacks.')
+
+
+if __name__ == '__main__':
+    setup_controller()

-- 
To view, visit https://gerrit.wikimedia.org/r/77663
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id27c627db0b22dda687754198bca623c378a9daa
Gerrit-PatchSet: 1
Gerrit-Project: analytics/user-metrics
Gerrit-Branch: master
Gerrit-Owner: Rfaulk <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to