Repository: incubator-ariatosca Updated Branches: refs/heads/operation_graph ebb5cc8b9 -> 2d589ed72 (forced update)
code review 2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2d589ed7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2d589ed7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2d589ed7 Branch: refs/heads/operation_graph Commit: 2d589ed7292040f81adfa4c507eb65e225be8aa5 Parents: 4a053b2 Author: mxmrlv <mxm...@gmail.com> Authored: Tue Oct 18 16:39:19 2016 +0300 Committer: mxmrlv <mxm...@gmail.com> Committed: Tue Oct 18 16:41:24 2016 +0300 ---------------------------------------------------------------------- aria/events/__init__.py | 6 +- aria/workflows/engine/engine.py | 2 +- aria/workflows/engine/tasks.py | 12 +-- aria/workflows/engine/transcription.py | 83 -------------------- aria/workflows/engine/translation.py | 83 ++++++++++++++++++++ .../test_task_graph_into_exececution_graph.py | 4 +- 6 files changed, 95 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2d589ed7/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py index b815c7a..70e7e03 100644 --- a/aria/events/__init__.py +++ b/aria/events/__init__.py @@ -20,13 +20,13 @@ from blinker import signal from ..tools.plugin import plugin_installer -# workflow workflows default signals: +# workflow engine default signals: start_task_signal = signal('start_task_signal') end_task_signal = signal('end_task_signal') on_success_task_signal = signal('success_task_signal') on_failure_task_signal = signal('failure_task_signal') -# workflow workflows workflow signals: +# workflow engine workflow signals: start_workflow_signal = signal('start_workflow_signal') end_workflow_signal = signal('end_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') @@ -34,7 +34,7 @@ on_failure_workflow_signal = signal('on_failure_workflow_signal') start_sub_workflow_signal = signal('start_sub_workflow_signal') end_sub_workflow_signal = signal('end_sub_workflow_signal') -# workflow workflows operation signals: +# workflow engine operation signals: start_operation_signal = signal('start_operation_signal') end_operation_signal = signal('end_operation_signal') on_success_operation_signal = signal('on_success_operation_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2d589ed7/aria/workflows/engine/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py index 2a5afce..508ae3b 100644 --- a/aria/workflows/engine/engine.py +++ b/aria/workflows/engine/engine.py @@ -29,7 +29,7 @@ from aria.events import ( ) from aria.logger import LoggerMixin -from .transcription import build_execution_graph +from .translation import build_execution_graph from ...storage import Model http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2d589ed7/aria/workflows/engine/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py index e120bdd..83b4263 100644 --- a/aria/workflows/engine/tasks.py +++ b/aria/workflows/engine/tasks.py @@ -44,18 +44,18 @@ class OperationTask(BaseTask): self._create_operation_in_storage() def _create_operation_in_storage(self): - Operation = self.operation_context.storage.operation.model_cls + Operation = self.context.storage.operation.model_cls operation = Operation( - id=self.operation_context.id, - execution_id=self.operation_context.execution_id, - max_retries=self.operation_context.parameters.get('max_retries', 1), + id=self.context.id, + execution_id=self.context.execution_id, + max_retries=self.context.parameters.get('max_retries', 1), status=Operation.PENDING, ) - self.operation_context.operation = operation + self.context.operation = operation def __getattr__(self, attr): try: - return getattr(self.operation_context, attr) + return getattr(self.context, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2d589ed7/aria/workflows/engine/transcription.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/transcription.py b/aria/workflows/engine/transcription.py deleted file mode 100644 index fc6138b..0000000 --- a/aria/workflows/engine/transcription.py +++ /dev/null @@ -1,83 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from aria import contexts - -from . import tasks - - -def build_execution_graph( - task_graph, - workflow_context, - execution_graph, - start_cls=tasks.StartWorkflowTask, - end_cls=tasks.EndWorkflowTask, - depends_on=()): - # Insert start marker - start_task = start_cls(id=_start_graph_id(task_graph.id), - name=_start_graph_id(task_graph.name), - context=workflow_context) - _add_task_and_dependencies(execution_graph, start_task, depends_on) - - for operation_or_workflow, dependencies in task_graph.task_tree(reverse=True): - operation_dependencies = _get_tasks_from_dependencies(execution_graph, dependencies, default=[start_task]) - - if _is_operation(operation_or_workflow): - # Add the task an the dependencies - operation_task = tasks.OperationTask(id=operation_or_workflow.id, - name=operation_or_workflow.name, - context=operation_or_workflow) - _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) - else: - # Built the graph recursively while adding start and end markers - build_execution_graph( - task_graph=operation_or_workflow, - workflow_context=workflow_context, - execution_graph=execution_graph, - start_cls=tasks.StartSubWorkflowTask, - end_cls=tasks.EndSubWorkflowTask, - depends_on=operation_dependencies - ) - - # Insert end marker - workflow_dependencies = _get_tasks_from_dependencies(execution_graph, task_graph.leaf_tasks, default=[start_task]) - end_task = end_cls(id=_end_graph_id(task_graph.id), name=_end_graph_id(task_graph.name), context=workflow_context) - _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) - - -def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()): - execution_graph.add_node(operation_task.id, task=operation_task) - for dependency in operation_dependencies: - execution_graph.add_edge(dependency.id, operation_task.id) - - -def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): - """ - Returns task list from dependencies. - """ - return [execution_graph.node[dependency.id if _is_operation(dependency) else _end_graph_id(dependency.id)] - ['task'] for dependency in dependencies] or default - - -def _is_operation(task): - return isinstance(task, contexts.OperationContext) - - -def _start_graph_id(id): - return '{0}-Start'.format(id) - - -def _end_graph_id(id): - return '{0}-End'.format(id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2d589ed7/aria/workflows/engine/translation.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/translation.py b/aria/workflows/engine/translation.py new file mode 100644 index 0000000..71d7bcd --- /dev/null +++ b/aria/workflows/engine/translation.py @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria import contexts + +from . import tasks + + +def build_execution_graph( + task_graph, + workflow_context, + execution_graph, + start_cls=tasks.StartWorkflowTask, + end_cls=tasks.EndWorkflowTask, + depends_on=()): + # Insert start marker + start_task = start_cls(id=_start_graph_suffix(task_graph.id), + name=_start_graph_suffix(task_graph.name), + context=workflow_context) + _add_task_and_dependencies(execution_graph, start_task, depends_on) + + for operation_or_workflow, dependencies in task_graph.task_tree(reverse=True): + operation_dependencies = _get_tasks_from_dependencies(execution_graph, dependencies, default=[start_task]) + + if _is_operation(operation_or_workflow): + # Add the task an the dependencies + operation_task = tasks.OperationTask(id=operation_or_workflow.id, + name=operation_or_workflow.name, + context=operation_or_workflow) + _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) + else: + # Built the graph recursively while adding start and end markers + build_execution_graph( + task_graph=operation_or_workflow, + workflow_context=workflow_context, + execution_graph=execution_graph, + start_cls=tasks.StartSubWorkflowTask, + end_cls=tasks.EndSubWorkflowTask, + depends_on=operation_dependencies + ) + + # Insert end marker + workflow_dependencies = _get_tasks_from_dependencies(execution_graph, task_graph.leaf_tasks, default=[start_task]) + end_task = end_cls(id=_end_graph_suffix(task_graph.id), name=_end_graph_suffix(task_graph.name), context=workflow_context) + _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) + + +def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()): + execution_graph.add_node(operation_task.id, task=operation_task) + for dependency in operation_dependencies: + execution_graph.add_edge(dependency.id, operation_task.id) + + +def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): + """ + Returns task list from dependencies. + """ + return [execution_graph.node[dependency.id if _is_operation(dependency) else _end_graph_suffix(dependency.id)] + ['task'] for dependency in dependencies] or default + + +def _is_operation(task): + return isinstance(task, contexts.OperationContext) + + +def _start_graph_suffix(id): + return '{0}-Start'.format(id) + + +def _end_graph_suffix(id): + return '{0}-End'.format(id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2d589ed7/tests/workflows/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py b/tests/workflows/test_task_graph_into_exececution_graph.py index f7007ed..28f31a0 100644 --- a/tests/workflows/test_task_graph_into_exececution_graph.py +++ b/tests/workflows/test_task_graph_into_exececution_graph.py @@ -18,7 +18,7 @@ from networkx import topological_sort, DiGraph from aria import contexts from aria.workflows.api import tasks_graph -from aria.workflows.engine import tasks, transcription +from aria.workflows.engine import tasks, translation @pytest.fixture(autouse=True) @@ -43,7 +43,7 @@ def test_task_graph_into_execution_graph(): # Direct check execution_graph = DiGraph() - transcription.build_execution_graph(task_graph=task_graph, workflow_context=None, execution_graph=execution_graph) + translation.build_execution_graph(task_graph=task_graph, workflow_context=None, execution_graph=execution_graph) execution_tasks = topological_sort(execution_graph) assert len(execution_tasks) == 7