Repository: incubator-ariatosca
Updated Branches:
  refs/heads/wf-executor [created] be147d967


add multiprocessing executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/be147d96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/be147d96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/be147d96

Branch: refs/heads/wf-executor
Commit: be147d96713bc3091c09b8b67b5c9241cf35e3b4
Parents: b53aa1f
Author: Dan Kilman <dankil...@gmail.com>
Authored: Thu Oct 13 19:39:01 2016 +0300
Committer: Dan Kilman <dankil...@gmail.com>
Committed: Thu Oct 13 19:39:01 2016 +0300

----------------------------------------------------------------------
 aria/cli/commands.py              |  4 +-
 aria/workflows/engine/executor.py | 74 +++++++++++++++++++++++++++++++++-
 2 files changed, 75 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/be147d96/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 9fa4911..601f4ca 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -27,7 +27,7 @@ from aria.storage import FileSystemModelDriver, 
FileSystemResourceDriver
 from aria.tools.application import StorageManager
 from aria.contexts import WorkflowContext
 from aria.workflows.engine.engine import Engine
-from aria.workflows.engine.executor import LocalThreadExecutor
+from aria.workflows.engine.executor import ThreadExecutor
 
 from .storage import (
     local_resource_storage,
@@ -225,7 +225,7 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function = self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context, 
**workflow_context.parameters)
-        executor = LocalThreadExecutor()
+        executor = ThreadExecutor()
         workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/be147d96/aria/workflows/engine/executor.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/executor.py 
b/aria/workflows/engine/executor.py
index dacfc15..a69e87a 100644
--- a/aria/workflows/engine/executor.py
+++ b/aria/workflows/engine/executor.py
@@ -14,8 +14,12 @@
 # limitations under the License.
 
 import threading
+import multiprocessing
 import Queue
 from importlib import import_module
+from collections import namedtuple
+
+import jsonpickle
 
 from aria.events import (
     start_task_signal,
@@ -39,7 +43,7 @@ class Executor(object):
         on_success_task_signal.send(self, task_id=task_id)
 
 
-class LocalThreadExecutor(Executor):
+class ThreadExecutor(Executor):
 
     def __init__(self, pool_size=1):
         self.stopped = False
@@ -85,3 +89,71 @@ class LocalThreadExecutor(Executor):
         except AttributeError:
             # TODO: handle
             raise
+
+
+class MultiprocessExecutor(Executor):
+
+    Action = namedtuple('Action', 'name task_id exception')
+
+    def __init__(self, pool_size=1):
+        self.stopped = False
+        self.listener = threading.Thread(target=self._listener)
+        self.listener.daemon = True
+        self.listener.start()
+        self.queue = multiprocessing.Queue()
+        self.pool = multiprocessing.Pool(processes=pool_size)
+
+    def execute(self, task):
+        self.pool.apply_async(self._process, args=(
+            self.queue,
+            task.id,
+            task.operation_context.operation_details,
+            task.operation_context.inputs))
+
+    def close(self):
+        self.pool.close()
+        self.stopped = True
+
+    def _listener(self):
+        while not self.stopped:
+            try:
+                action = self.queue.get(timeout=1)
+                if action.name == 'task_started':
+                    self.task_started(action.task_id)
+                elif action.name == 'task_succeeded':
+                    self.task_succeeded(action.task_id)
+                elif action.name == 'task_failed':
+                    self.task_failed(action.task_id, 
exception=jsonpickle.loads(action.exception))
+                else:
+                    # TODO: something
+                    raise RuntimeError()
+            # Daemon threads
+            except:
+                pass
+
+    def _process(self, queue, task_id, operation_details, operation_inputs):
+        queue.put(self.Action(action='task_started',
+                              task_id=task_id,
+                              exception=None))
+        try:
+            task_func = self._load_task(operation_details['operation'])
+            task_func(**operation_inputs)
+            queue.put(self.Action(action='task_succeeded',
+                                  task_id=task_id,
+                                  exception=None))
+        except BaseException as e:
+            queue.put(self.Action(action='task_failed',
+                                  task_id=task_id,
+                                  exception=jsonpickle.dumps(e)))
+
+    def _load_task(self, handler_path):
+        module_name, spec_handler_name = handler_path.rsplit('.', 1)
+        try:
+            module = import_module(module_name)
+            return getattr(module, spec_handler_name)
+        except ImportError:
+            # TODO: handle
+            raise
+        except AttributeError:
+            # TODO: handle
+            raise

Reply via email to