Repository: incubator-ariatosca Updated Branches: refs/heads/wf-executor a0ac985a1 -> 5f27cb51f (forced update)
Add translation mechanism from task_graph into execution_graph Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f2f41313 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f2f41313 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f2f41313 Branch: refs/heads/wf-executor Commit: f2f4131369b7c98af889195e4f5ba7eeae5f63a9 Parents: b53aa1f Author: mxmrlv <mxm...@gmail.com> Authored: Thu Oct 13 19:52:35 2016 +0300 Committer: mxmrlv <mxm...@gmail.com> Committed: Tue Oct 18 16:45:37 2016 +0300 ---------------------------------------------------------------------- aria/workflows/engine/engine.py | 35 ++------- aria/workflows/engine/tasks.py | 61 ++++++++++++++ aria/workflows/engine/translation.py | 83 ++++++++++++++++++++ tests/requirements.txt | 1 + tests/workflows/__init__.py | 14 ++++ .../test_task_graph_into_exececution_graph.py | 73 +++++++++++++++++ 6 files changed, 240 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/aria/workflows/engine/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py index 7b86fb5..508ae3b 100644 --- a/aria/workflows/engine/engine.py +++ b/aria/workflows/engine/engine.py @@ -29,6 +29,11 @@ from aria.events import ( ) from aria.logger import LoggerMixin +from .translation import build_execution_graph + + +from ...storage import Model + class Engine(LoggerMixin): @@ -38,10 +43,9 @@ class Engine(LoggerMixin): self._tasks_graph = tasks_graph self._execution_graph = DiGraph() self._executor = executor - self._build_execution_graph(self._workflow_context, self._tasks_graph) - - def _build_execution_graph(self, workflow_context, graph): - pass + build_execution_graph(task_graph=self._tasks_graph, + workflow_context=workflow_context, + execution_graph=self._execution_graph) def execute(self): execution_id = self._workflow_context.execution_id @@ -160,26 +164,3 @@ class Engine(LoggerMixin): start_task_signal.disconnect(self._task_started_receiver) on_success_task_signal.disconnect(self._task_succeeded_receiver) on_failure_task_signal.disconnect(self._task_failed_receiver) - - -class Task(object): - - def __init__(self, operation_context): - self.operation_context = operation_context - self._create_operation_in_storage() - - def _create_operation_in_storage(self): - Operation = self.operation_context.storage.operation.model_cls - operation = Operation( - id=self.operation_context.id, - execution_id=self.operation_context.execution_id, - max_retries=self.operation_context.parameters.get('max_retries', 1), - status=Operation.PENDING, - ) - self.operation_context.operation = operation - - def __getattr__(self, attr): - try: - return getattr(self.operation_context, attr) - except AttributeError: - return super(Task, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/aria/workflows/engine/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py new file mode 100644 index 0000000..83b4263 --- /dev/null +++ b/aria/workflows/engine/tasks.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class BaseTask(object): + + def __init__(self, id, name, context): + self.id = id + self.name = name + self.context = context + + +class StartWorkflowTask(BaseTask): + pass + + +class EndWorkflowTask(BaseTask): + pass + + +class StartSubWorkflowTask(BaseTask): + pass + + +class EndSubWorkflowTask(BaseTask): + pass + + +class OperationTask(BaseTask): + def __init__(self, *args, **kwargs): + super(OperationTask, self).__init__(*args, **kwargs) + self._create_operation_in_storage() + + def _create_operation_in_storage(self): + Operation = self.context.storage.operation.model_cls + operation = Operation( + id=self.context.id, + execution_id=self.context.execution_id, + max_retries=self.context.parameters.get('max_retries', 1), + status=Operation.PENDING, + ) + self.context.operation = operation + + def __getattr__(self, attr): + try: + return getattr(self.context, attr) + except AttributeError: + return super(OperationTask, self).__getattribute__(attr) + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/aria/workflows/engine/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/translation.py b/aria/workflows/engine/translation.py new file mode 100644 index 0000000..71d7bcd --- /dev/null +++ b/aria/workflows/engine/translation.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria import contexts + +from . import tasks + + +def build_execution_graph( + task_graph, + workflow_context, + execution_graph, + start_cls=tasks.StartWorkflowTask, + end_cls=tasks.EndWorkflowTask, + depends_on=()): + # Insert start marker + start_task = start_cls(id=_start_graph_suffix(task_graph.id), + name=_start_graph_suffix(task_graph.name), + context=workflow_context) + _add_task_and_dependencies(execution_graph, start_task, depends_on) + + for operation_or_workflow, dependencies in task_graph.task_tree(reverse=True): + operation_dependencies = _get_tasks_from_dependencies(execution_graph, dependencies, default=[start_task]) + + if _is_operation(operation_or_workflow): + # Add the task an the dependencies + operation_task = tasks.OperationTask(id=operation_or_workflow.id, + name=operation_or_workflow.name, + context=operation_or_workflow) + _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) + else: + # Built the graph recursively while adding start and end markers + build_execution_graph( + task_graph=operation_or_workflow, + workflow_context=workflow_context, + execution_graph=execution_graph, + start_cls=tasks.StartSubWorkflowTask, + end_cls=tasks.EndSubWorkflowTask, + depends_on=operation_dependencies + ) + + # Insert end marker + workflow_dependencies = _get_tasks_from_dependencies(execution_graph, task_graph.leaf_tasks, default=[start_task]) + end_task = end_cls(id=_end_graph_suffix(task_graph.id), name=_end_graph_suffix(task_graph.name), context=workflow_context) + _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) + + +def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()): + execution_graph.add_node(operation_task.id, task=operation_task) + for dependency in operation_dependencies: + execution_graph.add_edge(dependency.id, operation_task.id) + + +def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): + """ + Returns task list from dependencies. + """ + return [execution_graph.node[dependency.id if _is_operation(dependency) else _end_graph_suffix(dependency.id)] + ['task'] for dependency in dependencies] or default + + +def _is_operation(task): + return isinstance(task, contexts.OperationContext) + + +def _start_graph_suffix(id): + return '{0}-Start'.format(id) + + +def _end_graph_suffix(id): + return '{0}-End'.format(id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/tests/requirements.txt ---------------------------------------------------------------------- diff --git a/tests/requirements.txt b/tests/requirements.txt index 07d82a6..f3443d1 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -4,3 +4,4 @@ tox==1.6.1 pylint==1.5.5 pytest==3.0.2 pytest-cov==2.3.1 +pytest-mock==1.2 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/tests/workflows/__init__.py ---------------------------------------------------------------------- diff --git a/tests/workflows/__init__.py b/tests/workflows/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/workflows/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/tests/workflows/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py b/tests/workflows/test_task_graph_into_exececution_graph.py new file mode 100644 index 0000000..28f31a0 --- /dev/null +++ b/tests/workflows/test_task_graph_into_exececution_graph.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from networkx import topological_sort, DiGraph + +from aria import contexts +from aria.workflows.api import tasks_graph +from aria.workflows.engine import tasks, translation + + +@pytest.fixture(autouse=True) +def no_storage(monkeypatch): + monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage', value=lambda *args, **kwargs: None) + + +def test_task_graph_into_execution_graph(): + task_graph = tasks_graph.TaskGraph('test_task_graph') + simple_before_task = contexts.OperationContext('test_simple_before_task', {}, {}, None) + simple_after_task = contexts.OperationContext('test_simple_after_task', {}, {}, None) + + inner_task_graph = tasks_graph.TaskGraph('test_inner_task_graph') + inner_task = contexts.OperationContext('test_inner_task', {}, {}, None) + inner_task_graph.add_task(inner_task) + + task_graph.add_task(simple_before_task) + task_graph.add_task(simple_after_task) + task_graph.add_task(inner_task_graph) + task_graph.dependency(inner_task_graph, [simple_before_task]) + task_graph.dependency(simple_after_task, [inner_task_graph]) + + # Direct check + execution_graph = DiGraph() + translation.build_execution_graph(task_graph=task_graph, workflow_context=None, execution_graph=execution_graph) + execution_tasks = topological_sort(execution_graph) + + assert len(execution_tasks) == 7 + + expected_tasks_names = [ + '{0}-Start'.format(task_graph.id), + simple_before_task.id, + '{0}-Start'.format(inner_task_graph.id), + inner_task.id, + '{0}-End'.format(inner_task_graph.id), + simple_after_task.id, + '{0}-End'.format(task_graph.id) + ] + + assert expected_tasks_names == execution_tasks + + assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), tasks.StartWorkflowTask) + assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), tasks.StartSubWorkflowTask) + assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), tasks.EndSubWorkflowTask) + assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), tasks.EndWorkflowTask) + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task']