Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks 54d5d5095 -> b19ef4e77 (forced update)
fix 2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b19ef4e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b19ef4e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b19ef4e7 Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: b19ef4e7727813dc042edb0953e04a59d95fc74e Parents: 70ccc9f Author: max-orlov <ma...@gigaspaces.com> Authored: Tue Jun 20 18:34:40 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Tue Jun 20 18:50:22 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflow_runner.py | 5 +- aria/orchestrator/workflows/api/task.py | 3 +- aria/orchestrator/workflows/core/compile.py | 116 ++++++++++++++++++ aria/orchestrator/workflows/core/task.py | 119 ------------------- aria/orchestrator/workflows/executor/base.py | 17 ++- tests/orchestrator/context/__init__.py | 4 +- tests/orchestrator/context/test_serialize.py | 4 +- .../orchestrator/execution_plugin/test_local.py | 4 +- tests/orchestrator/execution_plugin/test_ssh.py | 5 +- .../orchestrator/workflows/core/test_engine.py | 4 +- .../orchestrator/workflows/core/test_events.py | 4 +- .../test_task_graph_into_execution_graph.py | 4 +- .../executor/test_process_executor_extension.py | 4 +- .../test_process_executor_tracked_changes.py | 4 +- 14 files changed, 148 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index a57a34e..9e6b3ad 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -24,7 +24,7 @@ from datetime import datetime from . import exceptions from .context.workflow import WorkflowContext from .workflows import builtin -from .workflows.core import engine, task +from .workflows.core import engine, compile from .workflows.executor.process import ProcessExecutor from ..modeling import models from ..modeling import utils as modeling_utils @@ -87,7 +87,8 @@ class WorkflowRunner(object): execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - task.create_execution_tasks(self._workflow_context, self._tasks_graph, executor.__class__) + compile.create_execution_tasks( + self._workflow_context, self._tasks_graph, executor.__class__) self._engine = engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index ce34005..f7d2c66 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -145,7 +145,8 @@ class OperationTask(BaseTask): elif getattr(self.actor, 'source_node', None) is not None: self._context_cls = context.operation.RelationshipOperationContext else: - self._context_cls = context.operation.BaseOperationContext + raise exceptions.TaskCreationException('Could not locate valid context for ' + '{actor.__class__}'.format(actor=self.actor)) def __repr__(self): return self.name http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/aria/orchestrator/workflows/core/compile.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/compile.py b/aria/orchestrator/workflows/core/compile.py new file mode 100644 index 0000000..932268a --- /dev/null +++ b/aria/orchestrator/workflows/core/compile.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ....modeling import models +from .. import executor, api + + +def create_execution_tasks(ctx, task_graph, default_executor): + execution = ctx.execution + _construct_execution_tasks(execution, task_graph, default_executor) + ctx.model.execution.update(execution) + return execution.tasks + + +def _construct_execution_tasks(execution, + task_graph, + default_executor, + stub_executor=executor.base.StubTaskExecutor, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + depends_on = list(depends_on) + + # Insert start marker + start_task = models.Task(execution=execution, + dependencies=depends_on, + _api_id=_start_graph_suffix(task_graph.id), + _stub_type=start_stub_type, + _executor=stub_executor) + + for task in task_graph.topological_order(reverse=True): + operation_dependencies = _get_tasks_from_dependencies( + execution, task_graph.get_dependencies(task), [start_task]) + + if isinstance(task, api.task.OperationTask): + models.Task.from_api_task(api_task=task, + executor=default_executor, + dependencies=operation_dependencies) + + elif isinstance(task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + _construct_execution_tasks( + execution=execution, + task_graph=task, + default_executor=default_executor, + stub_executor=stub_executor, + start_stub_type=models.Task.START_SUBWROFKLOW, + end_stub_type=models.Task.END_SUBWORKFLOW, + depends_on=operation_dependencies + ) + elif isinstance(task, api.task.StubTask): + models.Task(execution=execution, + dependencies=operation_dependencies, + _api_id=task.id, + _executor=stub_executor, + _stub_type=models.Task.STUB, + ) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + models.Task(dependencies=_get_non_dependent_tasks(execution) or [start_task], + execution=execution, + _api_id=_end_graph_suffix(task_graph.id), + _executor=stub_executor, + _stub_type=end_stub_type) + + +def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + +def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + +def _get_non_dependent_tasks(execution): + tasks_with_dependencies = set() + for task in execution.tasks: + tasks_with_dependencies.update(task.dependencies) + return list(set(execution.tasks) - set(tasks_with_dependencies)) + + +def _get_tasks_from_dependencies(execution, dependencies, default=()): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if getattr(dependency, 'actor', False): + # This is + dependency_name = dependency.id + else: + dependency_name = _end_graph_suffix(dependency.id) + tasks.extend(task for task in execution.tasks if task._api_id == dependency_name) + return tasks or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py deleted file mode 100644 index a9f8d18..0000000 --- a/aria/orchestrator/workflows/core/task.py +++ /dev/null @@ -1,119 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -The workflow engine. Executes workflows -""" - -from ....modeling import models -from .. import executor, api - - -def create_execution_tasks(ctx, task_graph, default_executor): - execution = ctx.execution - _construct_execution_tasks(execution, task_graph, default_executor) - ctx.model.execution.update(execution) - return execution.tasks - - -def _construct_execution_tasks(execution, - task_graph, - default_executor, - stub_executor=executor.base.StubTaskExecutor, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param start_stub_type: internal use - :param end_stub_type: internal use - :param depends_on: internal use - """ - depends_on = list(depends_on) - - # Insert start marker - start_task = models.Task(execution=execution, - dependencies=depends_on, - _api_id=_start_graph_suffix(task_graph.id), - _stub_type=start_stub_type, - _executor=stub_executor) - - for task in task_graph.topological_order(reverse=True): - operation_dependencies = _get_tasks_from_dependencies( - execution, task_graph.get_dependencies(task), [start_task]) - - if isinstance(task, api.task.OperationTask): - models.Task.from_api_task(api_task=task, - executor=default_executor, - dependencies=operation_dependencies) - - elif isinstance(task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - _construct_execution_tasks( - execution=execution, - task_graph=task, - default_executor=default_executor, - stub_executor=stub_executor, - start_stub_type=models.Task.START_SUBWROFKLOW, - end_stub_type=models.Task.END_SUBWORKFLOW, - depends_on=operation_dependencies - ) - elif isinstance(task, api.task.StubTask): - models.Task(execution=execution, - dependencies=operation_dependencies, - _api_id=task.id, - _executor=stub_executor, - _stub_type=models.Task.STUB, - ) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - models.Task(dependencies=_get_non_dependent_tasks(execution) or [start_task], - execution=execution, - _api_id=_end_graph_suffix(task_graph.id), - _executor=stub_executor, - _stub_type=end_stub_type) - - -def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - -def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - -def _get_non_dependent_tasks(execution): - tasks_with_dependencies = set() - for task in execution.tasks: - tasks_with_dependencies.update(task.dependencies) - return list(set(execution.tasks) - set(tasks_with_dependencies)) - - -def _get_tasks_from_dependencies(execution, dependencies, default=()): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if getattr(dependency, 'actor', False): - # This is - dependency_name = dependency.id - else: - dependency_name = _end_graph_suffix(dependency.id) - tasks.extend(task for task in execution.tasks if task._api_id == dependency_name) - return tasks or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 257d12c..6a3c9d2 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -33,15 +33,14 @@ class BaseExecutor(logger.LoggerMixin): Execute a task :param task: task to execute """ - with ctx.persist_changes: - if ctx.task.function: - self._execute(ctx) - else: - # In this case the task is missing a function. This task still gets to an - # executor, but since there is nothing to run, we by default simply skip the - # execution itself. - self._task_started(ctx) - self._task_succeeded(ctx) + if ctx.task.function: + self._execute(ctx) + else: + # In this case the task is missing a function. This task still gets to an + # executor, but since there is nothing to run, we by default simply skip the + # execution itself. + self._task_started(ctx) + self._task_succeeded(ctx) def close(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 777c051..086a066 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -15,7 +15,7 @@ import sys -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile def op_path(func, module_path=None): @@ -26,7 +26,7 @@ def op_path(func, module_path=None): def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - task.create_execution_tasks(workflow_context, graph, executor.__class__) + compile.create_execution_tasks(workflow_context, graph, executor.__class__) eng = engine.Engine(executors={executor.__class__: executor}) eng.execute(workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index c9227e6..5db5b63 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -16,7 +16,7 @@ import pytest from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation import tests @@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir): context.model.node.update(node) graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - task.create_execution_tasks(context, graph, executor.__class__) + compile.create_execution_tasks(context, graph, executor.__class__) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 361ddab..1695320 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -28,7 +28,7 @@ from aria.orchestrator.execution_plugin.exceptions import ProcessException from aria.orchestrator.execution_plugin import local from aria.orchestrator.execution_plugin import constants from aria.orchestrator.workflows.executor import process -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile from tests import mock from tests import storage @@ -500,7 +500,7 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - task.create_execution_tasks(workflow_context, tasks_graph, executor.__class__) + compile.create_execution_tasks(workflow_context, tasks_graph, executor.__class__) eng = engine.Engine({executor.__class__: executor}) eng.execute(workflow_context) return workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 1498cd1..fb1dc09 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -29,7 +29,7 @@ from aria.orchestrator import events from aria.orchestrator import workflow from aria.orchestrator.workflows import api from aria.orchestrator.workflows.executor import process -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.exceptions import ExecutorException from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException from aria.orchestrator.execution_plugin import operations @@ -254,7 +254,8 @@ class TestWithActualSSHServer(object): graph.sequence(*ops) return graph tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter - task.create_execution_tasks(self._workflow_context, tasks_graph, self._executor.__class__) + compile.create_execution_tasks( + self._workflow_context, tasks_graph, self._executor.__class__) eng = engine.Engine({self._executor.__class__: self._executor}) eng.execute(self._workflow_context) return self._workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 44ec1da..b77d284 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -28,7 +28,7 @@ from aria.orchestrator.workflows import ( api, exceptions, ) -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.executor import thread from tests import mock, storage @@ -50,7 +50,7 @@ class BaseTest(object): @staticmethod def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - task.create_execution_tasks(workflow_context, graph, executor.__class__) + compile.create_execution_tasks(workflow_context, graph, executor.__class__) return engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index 5f4868a..2b82443 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -16,7 +16,7 @@ import pytest from aria.orchestrator.decorators import operation, workflow -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.executor.thread import ThreadExecutor from aria.orchestrator.workflows import api from aria.modeling.service_instance import NodeBase @@ -113,7 +113,7 @@ def run_operation_on_node(ctx, op_name, interface_name): operation_name=op_name, operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) node.interfaces[interface.name] = interface - task.create_execution_tasks( + compile.create_execution_tasks( ctx, single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name), ThreadExecutor) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 044d498..f5fb17a 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -18,7 +18,7 @@ from networkx import topological_sort from aria.modeling import models from aria.orchestrator import context from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import task +from aria.orchestrator.workflows.core import compile from aria.orchestrator.workflows.executor import base from tests import mock from tests import storage @@ -65,7 +65,7 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(inner_task_graph, simple_before_task) test_task_graph.add_dependency(simple_after_task, inner_task_graph) - task.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor) + compile.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor) execution_tasks = topological_sort(workflow_context._graph) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 8cd8123..ba98c4f 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -17,7 +17,7 @@ import pytest from aria import extension from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation @@ -57,7 +57,7 @@ def test_decorate_extension(context, executor): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - task.create_execution_tasks(context, graph, executor.__class__) + compile.create_execution_tasks(context, graph, executor.__class__) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = get_node(context).attributes.get('out').value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b19ef4e7/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index f0451d1..2f1c325 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -18,7 +18,7 @@ import copy import pytest from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, task +from aria.orchestrator.workflows.core import engine, compile from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation from aria.orchestrator.workflows import exceptions @@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - task.create_execution_tasks(context, graph, executor.__class__) + compile.create_execution_tasks(context, graph, executor.__class__) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')