Repository: incubator-ariatosca Updated Branches: refs/heads/wf-executor be147d967 -> a9120175e
some order Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a9120175 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a9120175 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a9120175 Branch: refs/heads/wf-executor Commit: a9120175efb9f62301a6bb544b4aacc0d86fb094 Parents: be147d9 Author: Dan Kilman <dankil...@gmail.com> Authored: Tue Oct 18 15:44:21 2016 +0300 Committer: Dan Kilman <dankil...@gmail.com> Committed: Tue Oct 18 15:44:21 2016 +0300 ---------------------------------------------------------------------- aria/events/__init__.py | 12 +- aria/events/builtin_event_handlers.py | 72 ++++++++++- aria/events/workflow_engine_event_handler.py | 70 +++++------ aria/tools/module.py | 29 +++++ aria/workflows/engine/engine.py | 145 +++------------------- aria/workflows/engine/executor.py | 134 +++++++++----------- 6 files changed, 205 insertions(+), 257 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py index 70e7e03..c9d7b20 100644 --- a/aria/events/__init__.py +++ b/aria/events/__init__.py @@ -20,25 +20,15 @@ from blinker import signal from ..tools.plugin import plugin_installer -# workflow engine default signals: +# workflow engine task signals: start_task_signal = signal('start_task_signal') -end_task_signal = signal('end_task_signal') on_success_task_signal = signal('success_task_signal') on_failure_task_signal = signal('failure_task_signal') # workflow engine workflow signals: start_workflow_signal = signal('start_workflow_signal') -end_workflow_signal = signal('end_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') on_failure_workflow_signal = signal('on_failure_workflow_signal') -start_sub_workflow_signal = signal('start_sub_workflow_signal') -end_sub_workflow_signal = signal('end_sub_workflow_signal') - -# workflow engine operation signals: -start_operation_signal = signal('start_operation_signal') -end_operation_signal = signal('end_operation_signal') -on_success_operation_signal = signal('on_success_operation_signal') -on_failure_operation_signal = signal('on_failure_operation_signal') plugin_installer( path=os.path.dirname(os.path.realpath(__file__)), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/events/builtin_event_handlers.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handlers.py b/aria/events/builtin_event_handlers.py index 59f59c1..6be39c3 100644 --- a/aria/events/builtin_event_handlers.py +++ b/aria/events/builtin_event_handlers.py @@ -13,8 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +from datetime import datetime + from ..storage.models import NodeInstance -from . import start_operation_signal +from . import ( + start_workflow_signal, + on_success_workflow_signal, + on_failure_workflow_signal, + start_task_signal, + on_success_task_signal, + on_failure_task_signal, +) class _OperationToNodeInstanceState(dict): @@ -33,7 +42,7 @@ _operation_to_node_instance_state = _OperationToNodeInstanceState({ }) -@start_operation_signal.connect +@start_task_signal.connect def _update_node_instance_state(sender, **kwargs): try: next_state = _operation_to_node_instance_state[sender.task_name] @@ -42,3 +51,62 @@ def _update_node_instance_state(sender, **kwargs): node_instance = sender.context.node_instance node_instance.state = next_state sender.context.storage.node_instance.store(node_instance) + + +@start_task_signal.connect +def _task_started(task, *args, **kwargs): + operation_context = task.operation_context + operation = operation_context.operation + operation.started_at = datetime.utcnow() + operation.status = operation.STARTED + operation_context.operation = operation + + +@on_failure_task_signal.connect +def _task_failed(task, *args, **kwargs): + operation_context = task.operation_context + operation = operation_context.operation + operation.ended_at = datetime.utcnow() + operation.status = operation.FAILED + operation_context.operation = operation + + +@on_success_task_signal.connect +def _task_succeeded(task, *args, **kwargs): + operation_context = task.operation_context + operation = operation_context.operation + operation.ended_at = datetime.utcnow() + operation.status = operation.SUCCESS + operation_context.operation = operation + + +@start_workflow_signal.connect +def _workflow_started(workflow_context, *args, **kwargs): + Execution = workflow_context.storage.execution.model_cls + execution = Execution( + id=workflow_context.execution_id, + deployment_id=workflow_context.deployment_id, + workflow_id=workflow_context.workflow_id, + blueprint_id=workflow_context.blueprint_id, + status=Execution.PENDING, + created_at=datetime.utcnow(), + error='', + parameters=workflow_context.parameters, + is_system_workflow=False + ) + workflow_context.execution = execution + + +@on_failure_workflow_signal.connect +def _workflow_failed(workflow_context, exception, *args, **kwargs): + execution = workflow_context.execution + execution.error = str(exception) + execution.status = execution.FAILED + workflow_context.execution = execution + + +@on_success_workflow_signal.connect +def _workflow_succeeded(workflow_context, *args, **kwargs): + execution = workflow_context.execution + execution.status = execution.TERMINATED + workflow_context.execution = execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/events/workflow_engine_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py index 59bed99..44759c5 100644 --- a/aria/events/workflow_engine_event_handler.py +++ b/aria/events/workflow_engine_event_handler.py @@ -14,61 +14,47 @@ # limitations under the License. from . import ( - start_operation_signal, - end_operation_signal, - on_success_operation_signal, - on_failure_operation_signal, + start_task_signal, + on_success_task_signal, + on_failure_task_signal, start_workflow_signal, - end_workflow_signal, - start_sub_workflow_signal, - end_sub_workflow_signal, + on_success_workflow_signal, + on_failure_workflow_signal ) -@start_operation_signal.connect -def start_operation_handler(sender, **kwargs): - sender.context.logger.debug( - 'Event - starting operation: {sender.task_name}'.format(sender=sender)) +@start_task_signal.connect +def start_task_handler(task, **kwargs): + task.context.logger.debug( + 'Event: Starting task: {task.context.name}'.format(task=task)) -@end_operation_signal.connect -def end_operation_handler(sender, **kwargs): - sender.context.logger.debug( - 'Event - finished operation: {sender.task_name}'.format(sender=sender)) +@on_success_task_signal.connect +def success_task_handler(task, **kwargs): + task.context.logger.debug( + 'Event: Task success: {task.context.name}'.format(task=task)) -@on_success_operation_signal.connect -def success_operation_handler(sender, **kwargs): - sender.context.logger.debug( - 'Event - operation success: {sender.task_name}'.format(sender=sender)) - - -@on_failure_operation_signal.connect -def failure_operation_handler(sender, **kwargs): - sender.context.logger.error( - 'Event - operation failure: {sender.task_name}'.format(sender=sender), +@on_failure_task_signal.connect +def failure_operation_handler(task, **kwargs): + task.context.logger.error( + 'Event: Task failure: {task.context.id}'.format(task=task), exc_info=kwargs.get('exception', True)) @start_workflow_signal.connect -def start_workflow_handler(sender, **kwargs): - sender.context.logger.debug( - 'Event - starting workflow: {sender.task_name}'.format(sender=sender)) - - -@end_workflow_signal.connect -def end_workflow_handler(sender, **kwargs): - sender.context.logger.debug( - 'Event - finished workflow: {sender.task_name}'.format(sender=sender)) +def start_workflow_handler(context, **kwargs): + context.logger.debug( + 'Event: Starting workflow: {context.name}'.format(context=context)) -@start_sub_workflow_signal.connect -def start_sub_workflow_handler(sender, **kwargs): - sender.context.logger.debug( - 'Event - starting sub workflow: {sender.task_name}'.format(sender=sender)) +@on_failure_workflow_signal.connect +def failure_workflow_handler(context, **kwargs): + context.logger.debug( + 'Event: Workflow failure: {context.name}'.format(context=context)) -@end_sub_workflow_signal.connect -def end_sub_workflow_handler(sender, **kwargs): - sender.context.logger.debug( - 'Event - finished sub workflow: {sender.task_name}'.format(sender=sender)) +@on_success_workflow_signal.connect +def success_workflow_handler(context, **kwargs): + context.logger.debug( + 'Event: Workflow success: {context.name}'.format(context=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/tools/module.py ---------------------------------------------------------------------- diff --git a/aria/tools/module.py b/aria/tools/module.py new file mode 100644 index 0000000..535f7aa --- /dev/null +++ b/aria/tools/module.py @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib + + +def load_attribute(attribute_path): + module_name, attribute_name = attribute_path.rsplit('.', 1) + try: + module = importlib.import_module(module_name) + return getattr(module, attribute_name) + except ImportError: + # TODO: handle + raise + except AttributeError: + # TODO: handle + raise http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/workflows/engine/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py index 7b86fb5..2b0507f 100644 --- a/aria/workflows/engine/engine.py +++ b/aria/workflows/engine/engine.py @@ -14,29 +14,20 @@ # limitations under the License. import time -from datetime import datetime -from contextlib import contextmanager -from networkx import DiGraph +import networkx -from aria.events import ( - start_workflow_signal, - on_success_workflow_signal, - on_failure_workflow_signal, - start_task_signal, - on_success_task_signal, - on_failure_task_signal, -) -from aria.logger import LoggerMixin +from aria import events +from aria import logger -class Engine(LoggerMixin): +class Engine(logger.LoggerMixin): def __init__(self, executor, workflow_context, tasks_graph, **kwargs): super(Engine, self).__init__(**kwargs) self._workflow_context = workflow_context self._tasks_graph = tasks_graph - self._execution_graph = DiGraph() + self._execution_graph = networkx.DiGraph() self._executor = executor self._build_execution_graph(self._workflow_context, self._tasks_graph) @@ -44,23 +35,21 @@ class Engine(LoggerMixin): pass def execute(self): - execution_id = self._workflow_context.execution_id - with self._connect_signals(): - try: - start_workflow_signal.send(self, execution_id=execution_id) - while True: - for task in self._ended_tasks(): - self._handle_ended_tasks(task) - for task in self._executable_tasks(): - self._handle_executable_task(task) - if self._all_tasks_consumed(): - break - else: - time.sleep(0.1) - on_success_workflow_signal.send(self, execution_id=execution_id) - except BaseException as e: - on_failure_workflow_signal.send(self, execution_id=execution_id, exception=e) - raise + try: + events.start_workflow_signal.send(self._workflow_context) + while True: + for task in self._ended_tasks(): + self._handle_ended_tasks(task) + for task in self._executable_tasks(): + self._handle_executable_task(task) + if self._all_tasks_consumed(): + break + else: + time.sleep(0.1) + events.on_success_workflow_signal.send(self._workflow_context) + except BaseException as e: + events.on_failure_workflow_signal.send(self._workflow_context, exception=e) + raise def _executable_tasks(self): now = time.time() @@ -81,9 +70,6 @@ class Engine(LoggerMixin): def _tasks_iter(self): return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) - def _get_task(self, task_id): - return self._execution_graph.node[task_id]['task'] - def _handle_executable_task(self, task): self._executor.execute(task) @@ -92,94 +78,3 @@ class Engine(LoggerMixin): raise RuntimeError('Workflow failed') else: self._execution_graph.remove_node(task.id) - - def _task_started_receiver(self, task_id, *args, **kwargs): - task = self._get_task(task_id) - operation_context = task.operation_context - operation = operation_context.operation - operation.started_at = datetime.utcnow() - operation.status = operation.STARTED - operation_context.operation = operation - - def _task_failed_receiver(self, task_id, *args, **kwargs): - task = self._get_task(task_id) - operation_context = task.operation_context - operation = operation_context.operation - operation.ended_at = datetime.utcnow() - operation.status = operation.FAILED - operation_context.operation = operation - - def _task_succeeded_receiver(self, task_id, *args, **kwargs): - task = self._get_task(task_id) - operation_context = task.operation_context - operation = operation_context.operation - operation.ended_at = datetime.utcnow() - operation.status = operation.SUCCESS - operation_context.operation = operation - - def _start_workflow_receiver(self, *args, **kwargs): - Execution = self._workflow_context.storage.execution.model_cls - execution = Execution( - id=self._workflow_context.execution_id, - deployment_id=self._workflow_context.deployment_id, - workflow_id=self._workflow_context.workflow_id, - blueprint_id=self._workflow_context.blueprint_id, - status=Execution.PENDING, - created_at=datetime.utcnow(), - error='', - parameters=self._workflow_context.parameters, - is_system_workflow=False - ) - self._workflow_context.execution = execution - - def _workflow_failed_receiver(self, exception, *args, **kwargs): - execution = self._workflow_context.execution - execution.error = str(exception) - execution.status = execution.FAILED - self._workflow_context.execution = execution - - def _workflow_succeeded_receiver(self, *args, **kwargs): - execution = self._workflow_context.execution - execution.status = execution.TERMINATED - self._workflow_context.execution = execution - - @contextmanager - def _connect_signals(self): - start_workflow_signal.connect(self._start_workflow_receiver) - on_success_workflow_signal.connect(self._workflow_succeeded_receiver) - on_failure_workflow_signal.connect(self._workflow_failed_receiver) - start_task_signal.connect(self._task_started_receiver) - on_success_task_signal.connect(self._task_succeeded_receiver) - on_failure_task_signal.connect(self._task_failed_receiver) - try: - yield - finally: - start_workflow_signal.disconnect(self._start_workflow_receiver) - on_success_workflow_signal.disconnect(self._workflow_succeeded_receiver) - on_failure_workflow_signal.disconnect(self._workflow_failed_receiver) - start_task_signal.disconnect(self._task_started_receiver) - on_success_task_signal.disconnect(self._task_succeeded_receiver) - on_failure_task_signal.disconnect(self._task_failed_receiver) - - -class Task(object): - - def __init__(self, operation_context): - self.operation_context = operation_context - self._create_operation_in_storage() - - def _create_operation_in_storage(self): - Operation = self.operation_context.storage.operation.model_cls - operation = Operation( - id=self.operation_context.id, - execution_id=self.operation_context.execution_id, - max_retries=self.operation_context.parameters.get('max_retries', 1), - status=Operation.PENDING, - ) - self.operation_context.operation = operation - - def __getattr__(self, attr): - try: - return getattr(self.operation_context, attr) - except AttributeError: - return super(Task, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a9120175/aria/workflows/engine/executor.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/executor.py b/aria/workflows/engine/executor.py index a69e87a..13ccd07 100644 --- a/aria/workflows/engine/executor.py +++ b/aria/workflows/engine/executor.py @@ -16,16 +16,12 @@ import threading import multiprocessing import Queue -from importlib import import_module -from collections import namedtuple +import collections import jsonpickle -from aria.events import ( - start_task_signal, - on_success_task_signal, - on_failure_task_signal, -) +from aria import events +from aria.tools import module class Executor(object): @@ -33,97 +29,90 @@ class Executor(object): def execute(self, task): raise NotImplementedError - def task_started(self, task_id): - start_task_signal.send(self, task_id=task_id) + def _task_started(self, task): + events.start_task_signal.send(task) - def task_failed(self, task_id, exception): - on_failure_task_signal.send(self, task_id=task_id, exception=exception) + def _task_failed(self, task, exception): + events.on_failure_task_signal.send(task, exception=exception) - def task_succeeded(self, task_id): - on_success_task_signal.send(self, task_id=task_id) + def _task_succeeded(self, task): + events.on_success_task_signal.send(task) class ThreadExecutor(Executor): def __init__(self, pool_size=1): - self.stopped = False - self.queue = Queue.Queue() - self.pool = [] + self._stopped = False + self._queue = Queue.Queue() + self._pool = [] for i in range(pool_size): name = 'LocalThreadExecutor-{index}'.format(index=i+1) thread = threading.Thread(target=self._processor, name=name) thread.daemon = True thread.start() - self.pool.append(thread) + self._pool.append(thread) def execute(self, task): - self.queue.put(task) + self._queue.put(task) def close(self): - self.stopped = True + self._stopped = True def _processor(self): - while not self.stopped: + while not self._stopped: try: - task = self.queue.get(timeout=1) - self.task_started(task.id) + task = self._queue.get(timeout=1) + self._task_started(task) try: operation_context = task.operation_context - task_func = self._load_task(operation_context.operation_details['operation']) + task_func = module.load_attribute( + operation_context.operation_details['operation']) task_func(**operation_context.inputs) - self.task_succeeded(task.id) + self._task_succeeded(task) except BaseException as e: - self.task_failed(task.id, exception=e) + self._task_failed(task, exception=e) # Daemon threads except: pass - def _load_task(self, handler_path): - module_name, spec_handler_name = handler_path.rsplit('.', 1) - try: - module = import_module(module_name) - return getattr(module, spec_handler_name) - except ImportError: - # TODO: handle - raise - except AttributeError: - # TODO: handle - raise - class MultiprocessExecutor(Executor): - Action = namedtuple('Action', 'name task_id exception') + _Message = collections.namedtuple('Message', 'name task_id exception') def __init__(self, pool_size=1): - self.stopped = False - self.listener = threading.Thread(target=self._listener) - self.listener.daemon = True - self.listener.start() - self.queue = multiprocessing.Queue() - self.pool = multiprocessing.Pool(processes=pool_size) + self._stopped = False + self._listener = threading.Thread(target=self._listener) + self._listener.daemon = True + self._listener.start() + self._queue = multiprocessing.Queue() + self._pool = multiprocessing.Pool(processes=pool_size, + maxtasksperchild=1) + self._tasks = {} def execute(self, task): - self.pool.apply_async(self._process, args=( - self.queue, + self._tasks[task.id] = task + self._pool.apply_async(self._process, args=( + self._queue, task.id, task.operation_context.operation_details, task.operation_context.inputs)) def close(self): - self.pool.close() - self.stopped = True + self._pool.close() + self._stopped = True def _listener(self): - while not self.stopped: + while not self._stopped: try: - action = self.queue.get(timeout=1) - if action.name == 'task_started': - self.task_started(action.task_id) - elif action.name == 'task_succeeded': - self.task_succeeded(action.task_id) - elif action.name == 'task_failed': - self.task_failed(action.task_id, exception=jsonpickle.loads(action.exception)) + message = self._queue.get(timeout=1) + if message.type == 'task_started': + self._task_started(self._tasks[message.task_id]) + elif message.type == 'task_succeeded': + self._task_succeeded(self._remove_task(message.task_id)) + elif message.type == 'task_failed': + self._task_failed(self._remove_task(message.task_id), + exception=jsonpickle.loads(message.exception)) else: # TODO: something raise RuntimeError() @@ -132,28 +121,19 @@ class MultiprocessExecutor(Executor): pass def _process(self, queue, task_id, operation_details, operation_inputs): - queue.put(self.Action(action='task_started', - task_id=task_id, - exception=None)) + queue.put(self._Message(type='task_started', + task_id=task_id, + exception=None)) try: - task_func = self._load_task(operation_details['operation']) + task_func = module.load_attribute(operation_details['operation']) task_func(**operation_inputs) - queue.put(self.Action(action='task_succeeded', - task_id=task_id, - exception=None)) + queue.put(self._Message(type='task_succeeded', + task_id=task_id, + exception=None)) except BaseException as e: - queue.put(self.Action(action='task_failed', - task_id=task_id, - exception=jsonpickle.dumps(e))) + queue.put(self._Message(type='task_failed', + task_id=task_id, + exception=jsonpickle.dumps(e))) - def _load_task(self, handler_path): - module_name, spec_handler_name = handler_path.rsplit('.', 1) - try: - module = import_module(module_name) - return getattr(module, spec_handler_name) - except ImportError: - # TODO: handle - raise - except AttributeError: - # TODO: handle - raise + def _remove_task(self, task_id): + return self._tasks.pop(task_id)