Repository: incubator-ariatosca Updated Branches: refs/heads/wf-executor [created] be147d967
add multiprocessing executor Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/be147d96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/be147d96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/be147d96 Branch: refs/heads/wf-executor Commit: be147d96713bc3091c09b8b67b5c9241cf35e3b4 Parents: b53aa1f Author: Dan Kilman <dankil...@gmail.com> Authored: Thu Oct 13 19:39:01 2016 +0300 Committer: Dan Kilman <dankil...@gmail.com> Committed: Thu Oct 13 19:39:01 2016 +0300 ---------------------------------------------------------------------- aria/cli/commands.py | 4 +- aria/workflows/engine/executor.py | 74 +++++++++++++++++++++++++++++++++- 2 files changed, 75 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/be147d96/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index 9fa4911..601f4ca 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -27,7 +27,7 @@ from aria.storage import FileSystemModelDriver, FileSystemResourceDriver from aria.tools.application import StorageManager from aria.contexts import WorkflowContext from aria.workflows.engine.engine import Engine -from aria.workflows.engine.executor import LocalThreadExecutor +from aria.workflows.engine.executor import ThreadExecutor from .storage import ( local_resource_storage, @@ -225,7 +225,7 @@ class ExecuteCommand(BaseCommand): ) workflow_function = self._load_workflow_handler(workflow['operation']) tasks_graph = workflow_function(workflow_context, **workflow_context.parameters) - executor = LocalThreadExecutor() + executor = ThreadExecutor() workflow_engine = Engine(executor=executor, workflow_context=workflow_context, tasks_graph=tasks_graph) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/be147d96/aria/workflows/engine/executor.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/executor.py b/aria/workflows/engine/executor.py index dacfc15..a69e87a 100644 --- a/aria/workflows/engine/executor.py +++ b/aria/workflows/engine/executor.py @@ -14,8 +14,12 @@ # limitations under the License. import threading +import multiprocessing import Queue from importlib import import_module +from collections import namedtuple + +import jsonpickle from aria.events import ( start_task_signal, @@ -39,7 +43,7 @@ class Executor(object): on_success_task_signal.send(self, task_id=task_id) -class LocalThreadExecutor(Executor): +class ThreadExecutor(Executor): def __init__(self, pool_size=1): self.stopped = False @@ -85,3 +89,71 @@ class LocalThreadExecutor(Executor): except AttributeError: # TODO: handle raise + + +class MultiprocessExecutor(Executor): + + Action = namedtuple('Action', 'name task_id exception') + + def __init__(self, pool_size=1): + self.stopped = False + self.listener = threading.Thread(target=self._listener) + self.listener.daemon = True + self.listener.start() + self.queue = multiprocessing.Queue() + self.pool = multiprocessing.Pool(processes=pool_size) + + def execute(self, task): + self.pool.apply_async(self._process, args=( + self.queue, + task.id, + task.operation_context.operation_details, + task.operation_context.inputs)) + + def close(self): + self.pool.close() + self.stopped = True + + def _listener(self): + while not self.stopped: + try: + action = self.queue.get(timeout=1) + if action.name == 'task_started': + self.task_started(action.task_id) + elif action.name == 'task_succeeded': + self.task_succeeded(action.task_id) + elif action.name == 'task_failed': + self.task_failed(action.task_id, exception=jsonpickle.loads(action.exception)) + else: + # TODO: something + raise RuntimeError() + # Daemon threads + except: + pass + + def _process(self, queue, task_id, operation_details, operation_inputs): + queue.put(self.Action(action='task_started', + task_id=task_id, + exception=None)) + try: + task_func = self._load_task(operation_details['operation']) + task_func(**operation_inputs) + queue.put(self.Action(action='task_succeeded', + task_id=task_id, + exception=None)) + except BaseException as e: + queue.put(self.Action(action='task_failed', + task_id=task_id, + exception=jsonpickle.dumps(e))) + + def _load_task(self, handler_path): + module_name, spec_handler_name = handler_path.rsplit('.', 1) + try: + module = import_module(module_name) + return getattr(module, spec_handler_name) + except ImportError: + # TODO: handle + raise + except AttributeError: + # TODO: handle + raise