Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-163-Update-node-state-for-stub-tasks b7b9ab63b -> 0570dc5bd (forced update)
ARIA-163 Update node state for stub tasks Additional changes: * removed `for_node` and `for_relationship` from the api OperationTask. * api based OperationTask could also have an empty implementation. * each core task wields its own executor. * Reordered some of the helper functions for creating tasks. * intoduced 2 new executors: StubTaskExecutor (for stub tasks) and EmptyOperationExecutor (for empty tasks) Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b22c3464 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b22c3464 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b22c3464 Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks Commit: b22c3464bd25622f248a5ec77491e29cfc88d4d4 Parents: 1f3e7ff Author: max-orlov <ma...@gigaspaces.com> Authored: Sun Apr 30 16:05:27 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu May 4 16:20:23 2017 +0300 ---------------------------------------------------------------------- aria/logger.py | 2 + aria/orchestrator/workflows/api/task.py | 185 ++++++++++--------- .../workflows/builtin/execute_operation.py | 28 +-- aria/orchestrator/workflows/builtin/heal.py | 4 +- aria/orchestrator/workflows/builtin/install.py | 9 +- aria/orchestrator/workflows/builtin/start.py | 4 +- aria/orchestrator/workflows/builtin/stop.py | 4 +- .../orchestrator/workflows/builtin/uninstall.py | 11 +- aria/orchestrator/workflows/builtin/utils.py | 138 -------------- .../orchestrator/workflows/builtin/workflows.py | 71 ++++--- aria/orchestrator/workflows/core/engine.py | 12 +- .../workflows/core/events_handler.py | 1 - aria/orchestrator/workflows/core/task.py | 40 ++-- aria/orchestrator/workflows/core/translation.py | 25 ++- aria/orchestrator/workflows/events_logging.py | 13 +- aria/orchestrator/workflows/executor/base.py | 11 ++ aria/orchestrator/workflows/executor/dry.py | 1 - .../profiles/tosca-simple-1.0/interfaces.yaml | 5 + tests/end2end/test_hello_world.py | 5 +- tests/end2end/testenv.py | 2 +- tests/orchestrator/context/test_operation.py | 32 ++-- tests/orchestrator/context/test_serialize.py | 2 +- tests/orchestrator/context/test_toolbelt.py | 8 +- .../orchestrator/execution_plugin/test_local.py | 4 +- tests/orchestrator/execution_plugin/test_ssh.py | 4 +- tests/orchestrator/workflows/api/test_task.py | 16 +- .../orchestrator/workflows/core/test_engine.py | 60 +++--- .../orchestrator/workflows/core/test_events.py | 4 +- tests/orchestrator/workflows/core/test_task.py | 8 +- .../test_task_graph_into_execution_graph.py | 25 ++- ...process_executor_concurrent_modifications.py | 18 +- .../executor/test_process_executor_extension.py | 9 +- .../test_process_executor_tracked_changes.py | 9 +- .../tosca-simple-1.0/node-cellar/workflows.py | 10 +- 34 files changed, 355 insertions(+), 425 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 8e15f5b..97d3878 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -195,6 +195,8 @@ class _SQLAlchemyHandler(logging.Handler): except BaseException: self._session.rollback() raise + finally: + self._session.close() _default_file_formatter = logging.Formatter( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 82c40c3..cb79eb3 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -21,6 +21,7 @@ from ... import context from ....modeling import models from ....modeling import utils as modeling_utils from ....utils.uuid import generate_uuid +from .. import exceptions class BaseTask(object): @@ -71,102 +72,44 @@ class OperationTask(BaseTask): Do not call this constructor directly. Instead, use :meth:`for_node` or :meth:`for_relationship`. """ - - actor_type = type(actor).__name__.lower() assert isinstance(actor, (models.Node, models.Relationship)) - assert actor_type in ('node', 'relationship') - assert interface_name and operation_name super(OperationTask, self).__init__() - self.actor = actor - self.max_attempts = (self.workflow_context._task_max_attempts - if max_attempts is None else max_attempts) - self.retry_interval = (self.workflow_context._task_retry_interval - if retry_interval is None else retry_interval) - self.ignore_failure = (self.workflow_context._task_ignore_failure - if ignore_failure is None else ignore_failure) self.interface_name = interface_name self.operation_name = operation_name + self.max_attempts = max_attempts or self.workflow_context._task_max_attempts + self.retry_interval = retry_interval or self.workflow_context._task_retry_interval + self.ignore_failure = \ + self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure + self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(), + name=actor.name, + interface=self.interface_name, + operation=self.operation_name) + # Creating OperationTask directly should raise an error when there is no + # interface/operation. + + if not has_operation(self.actor, self.interface_name, self.operation_name): + raise exceptions.OperationNotFoundException( + 'Could not find operation "{self.operation_name}" on interface ' + '"{self.interface_name}" for {actor_type} "{actor.name}"'.format( + self=self, + actor_type=type(actor).__name__.lower(), + actor=actor) + ) operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] self.plugin = operation.plugin self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) self.implementation = operation.implementation - self.name = OperationTask.NAME_FORMAT.format(type=actor_type, - name=actor.name, - interface=self.interface_name, - operation=self.operation_name) def __repr__(self): return self.name - @classmethod - def for_node(cls, - node, - interface_name, - operation_name, - max_attempts=None, - retry_interval=None, - ignore_failure=None, - inputs=None): - """ - Creates an operation on a node. - - :param node: The node on which to run the operation - :param interface_name: The interface name - :param operation_name: The operation name within the interface - :param max_attempts: The maximum number of attempts in case the operation fails - (if not specified the defaults it taken from the workflow context) - :param retry_interval: The interval in seconds between attempts when the operation fails - (if not specified the defaults it taken from the workflow context) - :param ignore_failure: Whether to ignore failures - (if not specified the defaults it taken from the workflow context) - :param inputs: Additional operation inputs - """ - - assert isinstance(node, models.Node) - return cls( - actor=node, - interface_name=interface_name, - operation_name=operation_name, - max_attempts=max_attempts, - retry_interval=retry_interval, - ignore_failure=ignore_failure, - inputs=inputs) - - @classmethod - def for_relationship(cls, - relationship, - interface_name, - operation_name, - max_attempts=None, - retry_interval=None, - ignore_failure=None, - inputs=None): - """ - Creates an operation on a relationship edge. - - :param relationship: The relationship on which to run the operation - :param interface_name: The interface name - :param operation_name: The operation name within the interface - :param max_attempts: The maximum number of attempts in case the operation fails - (if not specified the defaults it taken from the workflow context) - :param retry_interval: The interval in seconds between attempts when the operation fails - (if not specified the defaults it taken from the workflow context) - :param ignore_failure: Whether to ignore failures - (if not specified the defaults it taken from the workflow context) - :param inputs: Additional operation inputs - """ - assert isinstance(relationship, models.Relationship) - return cls( - actor=relationship, - interface_name=interface_name, - operation_name=operation_name, - max_attempts=max_attempts, - retry_interval=retry_interval, - ignore_failure=ignore_failure, - inputs=inputs) +class StubTask(BaseTask): + """ + Enables creating empty tasks. + """ class WorkflowTask(BaseTask): @@ -199,7 +142,83 @@ class WorkflowTask(BaseTask): return super(WorkflowTask, self).__getattribute__(item) -class StubTask(BaseTask): +def create_task(actor, interface_name, operation_name, **kwargs): """ - Enables creating empty tasks. + This helper function enables safe creation of OperationTask, if the supplied interface or + operation do not exist, None is returned. + :param actor: the actor for this task + :param interface_name: the name of the interface + :param operation_name: the name of the operation + :param kwargs: any additional kwargs to be passed to the task OperationTask + :return: and OperationTask or None (if the interface/operation does not exists) + """ + try: + return OperationTask( + actor, + interface_name=interface_name, + operation_name=operation_name, + **kwargs + ) + except exceptions.OperationNotFoundException: + return None + + +def create_relationships_tasks( + node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): + """ + Creates a relationship task (source and target) for all of a node_instance relationships. + :param basestring source_operation_name: the relationship operation name. + :param basestring interface_name: the name of the interface. + :param source_operation_name: + :param target_operation_name: + :param NodeInstance node: the source_node + :return: + """ + sub_tasks = [] + for relationship in node.outbound_relationships: + relationship_operations = create_relationship_tasks( + relationship, + interface_name, + source_operation_name=source_operation_name, + target_operation_name=target_operation_name, + **kwargs) + sub_tasks.append(relationship_operations) + return sub_tasks + + +def create_relationship_tasks(relationship, interface_name, source_operation_name=None, + target_operation_name=None, **kwargs): + """ + Creates a relationship task source and target. + :param Relationship relationship: the relationship instance itself + :param source_operation_name: + :param target_operation_name: + + :return: """ + operations = [] + if source_operation_name: + operations.append( + create_task( + relationship, + interface_name=interface_name, + operation_name=source_operation_name, + **kwargs + ) + ) + if target_operation_name: + operations.append( + create_task( + relationship, + interface_name=interface_name, + operation_name=target_operation_name, + **kwargs + ) + ) + + return [o for o in operations if o] + + +def has_operation(actor, interface_name, operation_name): + interface = actor.interfaces.get(interface_name, None) + return interface and interface.operations.get(operation_name, False) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py index 16504ec..02a654a 100644 --- a/aria/orchestrator/workflows/builtin/execute_operation.py +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -17,8 +17,8 @@ Builtin execute_operation workflow """ -from . import utils from ... import workflow +from ..api import task @workflow @@ -65,11 +65,11 @@ def execute_operation( # registering actual tasks to sequences for node in filtered_nodes: graph.add_tasks( - _create_node_task( - node=node, + task.OperationTask( + node, interface_name=interface_name, operation_name=operation_name, - operation_kwargs=operation_kwargs + inputs=operation_kwargs ) ) @@ -99,23 +99,3 @@ def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()): _is_node_by_id(node.id), _is_node_by_type(node.node_template.type))): yield node - - -def _create_node_task( - node, - interface_name, - operation_name, - operation_kwargs): - """ - A workflow which executes a single operation - :param node: the node instance to install - :param basestring operation: the operation name - :param dict operation_kwargs: - :return: - """ - - return utils.create_node_task( - node=node, - interface_name=interface_name, - operation_name=operation_name, - inputs=operation_kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py index 92b96ea..ca382e8 100644 --- a/aria/orchestrator/workflows/builtin/heal.py +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -103,7 +103,7 @@ def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes): graph.add_dependency(target_node_subgraph, node_sub_workflow) if target_node in failing_nodes: - dependency = relationship_tasks( + dependency = task.create_relationship_tasks( relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.unlink') graph.add_tasks(*dependency) @@ -157,7 +157,7 @@ def heal_install(ctx, graph, failing_nodes, targeted_nodes): graph.add_dependency(node_sub_workflow, target_node_subworkflow) if target_node in failing_nodes: - dependent = relationship_tasks( + dependent = task.create_relationship_tasks( relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.establish') graph.add_tasks(*dependent) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py index 2b9ec66..821b190 100644 --- a/aria/orchestrator/workflows/builtin/install.py +++ b/aria/orchestrator/workflows/builtin/install.py @@ -17,16 +17,15 @@ Builtin install workflow """ -from .workflows import install_node -from .utils import create_node_task_dependencies -from ..api.task import WorkflowTask from ... import workflow +from ..api import task as api_task +from . import workflows @workflow def install(ctx, graph): tasks_and_nodes = [] for node in ctx.nodes: - tasks_and_nodes.append((WorkflowTask(install_node, node=node), node)) + tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node)) graph.add_tasks([task for task, _ in tasks_and_nodes]) - create_node_task_dependencies(graph, tasks_and_nodes) + workflows.create_node_task_dependencies(graph, tasks_and_nodes) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/start.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/start.py b/aria/orchestrator/workflows/builtin/start.py index ad67554..1946143 100644 --- a/aria/orchestrator/workflows/builtin/start.py +++ b/aria/orchestrator/workflows/builtin/start.py @@ -18,11 +18,11 @@ Builtin start workflow """ from .workflows import start_node -from ..api.task import WorkflowTask from ... import workflow +from ..api import task as api_task @workflow def start(ctx, graph): for node in ctx.model.node.iter(): - graph.add_tasks(WorkflowTask(start_node, node=node)) + graph.add_tasks(api_task.WorkflowTask(start_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/stop.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/stop.py b/aria/orchestrator/workflows/builtin/stop.py index 23ac366..c1b60ae 100644 --- a/aria/orchestrator/workflows/builtin/stop.py +++ b/aria/orchestrator/workflows/builtin/stop.py @@ -18,11 +18,11 @@ Builtin stop workflow """ from .workflows import stop_node -from ..api.task import WorkflowTask +from ..api import task as api_task from ... import workflow @workflow def stop(ctx, graph): for node in ctx.model.node.iter(): - graph.add_tasks(WorkflowTask(stop_node, node=node)) + graph.add_tasks(api_task.WorkflowTask(stop_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py index e4afcd9..c35117e 100644 --- a/aria/orchestrator/workflows/builtin/uninstall.py +++ b/aria/orchestrator/workflows/builtin/uninstall.py @@ -17,18 +17,15 @@ Builtin uninstall workflow """ -from .workflows import uninstall_node -from .utils import create_node_task_dependencies -from ..api.task import WorkflowTask from ... import workflow +from ..api import task as api_task +from . import workflows @workflow def uninstall(ctx, graph): tasks_and_nodes = [] for node in ctx.nodes: - tasks_and_nodes.append(( - WorkflowTask(uninstall_node, node=node), - node)) + tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node)) graph.add_tasks([task for task, _ in tasks_and_nodes]) - create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) + workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py deleted file mode 100644 index 2254d13..0000000 --- a/aria/orchestrator/workflows/builtin/utils.py +++ /dev/null @@ -1,138 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ..api.task import OperationTask, StubTask -from .. import exceptions - - -def create_node_task(node, interface_name, operation_name, **kwargs): - """ - Returns a new operation task if the operation exists in the node, otherwise returns None. - """ - - try: - if _is_empty_task(node, interface_name, operation_name): - return StubTask() - - return OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name, - **kwargs) - except exceptions.OperationNotFoundException: - # We will skip nodes which do not have the operation - return None - - -def create_relationships_tasks( - node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): - """ - Creates a relationship task (source and target) for all of a node_instance relationships. - :param basestring source_operation_name: the relationship operation name. - :param basestring interface_name: the name of the interface. - :param source_operation_name: - :param target_operation_name: - :param NodeInstance node: the source_node - :return: - """ - sub_tasks = [] - for relationship in node.outbound_relationships: - relationship_operations = relationship_tasks( - relationship, - interface_name, - source_operation_name=source_operation_name, - target_operation_name=target_operation_name, - **kwargs) - sub_tasks.append(relationship_operations) - return sub_tasks - - -def relationship_tasks(relationship, interface_name, source_operation_name=None, - target_operation_name=None, **kwargs): - """ - Creates a relationship task source and target. - :param Relationship relationship: the relationship instance itself - :param source_operation_name: - :param target_operation_name: - - :return: - """ - operations = [] - if source_operation_name: - try: - if _is_empty_task(relationship, interface_name, source_operation_name): - operations.append(StubTask()) - else: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=source_operation_name, - **kwargs) - ) - except exceptions.OperationNotFoundException: - # We will skip relationships which do not have the operation - pass - if target_operation_name: - try: - if _is_empty_task(relationship, interface_name, target_operation_name): - operations.append(StubTask()) - else: - operations.append( - OperationTask.for_relationship(relationship=relationship, - interface_name=interface_name, - operation_name=target_operation_name, - **kwargs) - ) - except exceptions.OperationNotFoundException: - # We will skip relationships which do not have the operation - pass - - return operations - - -def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): - """ - Creates dependencies between tasks if there is a relationship (outbound) between their nodes. - """ - - def get_task(node_name): - for task, node in tasks_and_nodes: - if node.name == node_name: - return task - return None - - for task, node in tasks_and_nodes: - dependencies = [] - for relationship in node.outbound_relationships: - dependency = get_task(relationship.target_node.name) - if dependency: - dependencies.append(dependency) - if dependencies: - if reverse: - for dependency in dependencies: - graph.add_dependency(dependency, task) - else: - graph.add_dependency(task, dependencies) - - -def _is_empty_task(actor, interface_name, operation_name): - interface = actor.interfaces.get(interface_name) - if interface: - operation = interface.operations.get(operation_name) - if operation: - return operation.implementation is None - - raise exceptions.OperationNotFoundException( - 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' - .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index 60f14ed..b286e98 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -18,10 +18,7 @@ TSOCA normative lifecycle workflows. """ from ... import workflow -from .utils import ( - create_node_task, - create_relationships_tasks -) +from ..api import task NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard' @@ -72,19 +69,18 @@ __all__ = ( @workflow(suffix_template='{node.name}') def install_node(graph, node, **kwargs): # Create - sequence = [create_node_task(node, - NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] # Configure - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_PRE_CONFIGURE_SOURCE, - NORMATIVE_PRE_CONFIGURE_TARGET) - sequence.append(create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_POST_CONFIGURE_SOURCE, - NORMATIVE_POST_CONFIGURE_TARGET) + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_PRE_CONFIGURE_SOURCE, + NORMATIVE_PRE_CONFIGURE_TARGET) + sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_POST_CONFIGURE_SOURCE, + NORMATIVE_POST_CONFIGURE_TARGET) # Start sequence += _create_start_tasks(node) @@ -97,9 +93,7 @@ def uninstall_node(graph, node, **kwargs): sequence = _create_stop_tasks(node) # Delete - sequence.append(create_node_task(node, - NORMATIVE_STANDARD_INTERFACE, - NORMATIVE_DELETE)) + sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE)) graph.sequence(*sequence) @@ -115,16 +109,41 @@ def stop_node(graph, node, **kwargs): def _create_start_tasks(node): - sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET) + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET) return sequence def _create_stop_tasks(node): - sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) return sequence + + +def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): + """ + Creates dependencies between tasks if there is a relationship (outbound) between their nodes. + """ + + def get_task(node_name): + for api_task, task_node in tasks_and_nodes: + if task_node.name == node_name: + return api_task + return None + + for api_task, node in tasks_and_nodes: + dependencies = [] + for relationship in node.outbound_relationships: + dependency = get_task(relationship.target_node.name) + if dependency: + dependencies.append(dependency) + if dependencies: + if reverse: + for dependency in dependencies: + graph.add_dependency(dependency, api_task) + else: + graph.add_dependency(api_task, dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 155d0ee..fd0dd6d 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -44,7 +44,8 @@ class Engine(logger.LoggerMixin): self._execution_graph = networkx.DiGraph() self._executor = executor translation.build_execution_graph(task_graph=tasks_graph, - execution_graph=self._execution_graph) + execution_graph=self._execution_graph, + default_executor=self._executor) def execute(self): """ @@ -109,12 +110,11 @@ class Engine(logger.LoggerMixin): self._workflow_context.model.task.refresh(task.model_task) yield task - def _handle_executable_task(self, task): - if isinstance(task, engine_task.StubTask): - task.status = models.Task.SUCCESS - else: + @staticmethod + def _handle_executable_task(task): + if isinstance(task, engine_task.OperationTask): events.sent_task_signal.send(task) - self._executor.execute(task) + task.execute() def _handle_ended_tasks(self, task): if task.status == models.Task.FAILED and not task.ignore_failure: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index b43082b..4d24bb7 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -40,7 +40,6 @@ def _task_started(task, *args, **kwargs): with task._update(): task.started_at = datetime.utcnow() task.status = task.STARTED - _update_node_state_if_necessary(task, is_transitional=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 2b26152..cee4bf4 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -27,6 +27,7 @@ from functools import ( from ....modeling import models from ...context import operation as operation_context +from ..executor import base from .. import exceptions @@ -47,9 +48,13 @@ class BaseTask(object): Base class for Task objects """ - def __init__(self, id, *args, **kwargs): + def __init__(self, id, executor, *args, **kwargs): super(BaseTask, self).__init__(*args, **kwargs) self._id = id + self._executor = executor + + def execute(self): + return self._executor.execute(self) @property def id(self): @@ -61,19 +66,22 @@ class BaseTask(object): class StubTask(BaseTask): """ - Base stub task for all tasks that don't actually run anything + Base stub task for marker user tasks that only mark the start/end of a workflow + or sub-workflow """ + STARTED = models.Task.STARTED + SUCCESS = models.Task.SUCCESS def __init__(self, *args, **kwargs): - super(StubTask, self).__init__(*args, **kwargs) + super(StubTask, self).__init__(executor=base.StubTaskExecutor(), *args, **kwargs) self.status = models.Task.PENDING self.due_at = datetime.utcnow() def has_ended(self): - return self.status in (models.Task.SUCCESS, models.Task.FAILED) + return self.status == self.SUCCESS def is_waiting(self): - return self.status in (models.Task.PENDING, models.Task.RETRYING) + return not self.has_ended() class StartWorkflowTask(StubTask): @@ -108,14 +116,16 @@ class OperationTask(BaseTask): """ Operation task """ + def __init__(self, api_task, executor=None, *args, **kwargs): + # If no executor is provided, we infer that this is an empty task which does not need to be + # executed. + super(OperationTask, self).__init__( + id=api_task.id, executor=executor or base.EmptyOperationExecutor(), *args, **kwargs) - def __init__(self, api_task, *args, **kwargs): - super(OperationTask, self).__init__(id=api_task.id, **kwargs) self._workflow_context = api_task._workflow_context self.interface_name = api_task.interface_name self.operation_name = api_task.operation_name model_storage = api_task._workflow_context.model - plugin = api_task.plugin base_task_model = model_storage.task.model_cls if isinstance(api_task.actor, models.Node): @@ -130,15 +140,18 @@ class OperationTask(BaseTask): task_model = create_task_model( name=api_task.name, - implementation=api_task.implementation, actor=api_task.actor, - inputs=api_task.inputs, status=base_task_model.PENDING, max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure, - plugin=plugin, - execution=self._workflow_context.execution + execution=self._workflow_context.execution, + + # Only non-stub tasks have these fields + plugin=api_task.plugin, + implementation=api_task.implementation, + inputs=api_task.inputs + ) self._workflow_context.model.task.put(task_model) @@ -153,6 +166,9 @@ class OperationTask(BaseTask): self._task_id = task_model.id self._update_fields = None + def execute(self): + super(OperationTask, self).execute() + @contextmanager def _update(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index b6cbdad..b31ea8a 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -24,6 +24,7 @@ from . import task as core_task def build_execution_graph( task_graph, execution_graph, + default_executor, start_cls=core_task.StartWorkflowTask, end_cls=core_task.EndWorkflowTask, depends_on=()): @@ -43,19 +44,20 @@ def build_execution_graph( for api_task in task_graph.topological_order(reverse=True): dependencies = task_graph.get_dependencies(api_task) operation_dependencies = _get_tasks_from_dependencies( - execution_graph, - dependencies, - default=[start_task]) + execution_graph, dependencies, default=[start_task]) if isinstance(api_task, api.task.OperationTask): - # Add the task an the dependencies - operation_task = core_task.OperationTask(api_task) + if api_task.implementation: + operation_task = core_task.OperationTask(api_task, executor=default_executor) + else: + operation_task = core_task.OperationTask(api_task) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) elif isinstance(api_task, api.task.WorkflowTask): # Build the graph recursively while adding start and end markers build_execution_graph( task_graph=api_task, execution_graph=execution_graph, + default_executor=default_executor, start_cls=core_task.StartSubWorkflowTask, end_cls=core_task.EndSubWorkflowTask, depends_on=operation_dependencies @@ -85,11 +87,14 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): """ Returns task list from dependencies. """ - return [execution_graph.node[dependency.id - if isinstance(dependency, (api.task.OperationTask, - api.task.StubTask)) - else _end_graph_suffix(dependency.id)]['task'] - for dependency in dependencies] or default + tasks = [] + for dependency in dependencies: + if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)): + dependency_id = dependency.id + else: + dependency_id = _end_graph_suffix(dependency.id) + tasks.append(execution_graph.node[dependency_id]['task']) + return tasks or default def _start_graph_suffix(id): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 3ffe18b..236a55f 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -35,12 +35,21 @@ def _get_task_name(task): @events.start_task_signal.connect def _start_task_handler(task, **kwargs): - task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...' - .format(name=_get_task_name(task), task=task)) + # If the task has not implementation this is an empty task. + if task.implementation: + suffix = 'started...' + logger = task.context.logger.info + else: + suffix = 'has no implementation' + logger = task.context.logger.debug + logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format( + name=_get_task_name(task), task=task, suffix=suffix)) @events.on_success_task_signal.connect def _success_task_handler(task, **kwargs): + if not task.implementation: + return task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful' .format(name=_get_task_name(task), task=task)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 39becef..a225837 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -50,3 +50,14 @@ class BaseExecutor(logger.LoggerMixin): @staticmethod def _task_succeeded(task): events.on_success_task_signal.send(task) + + +class StubTaskExecutor(BaseExecutor): + def execute(self, task): + task.status = task.SUCCESS + + +class EmptyOperationExecutor(BaseExecutor): + def execute(self, task): + events.start_task_signal.send(task) + events.on_success_task_signal.send(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index e1261bb..eb70a41 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -16,7 +16,6 @@ """ Dry executor """ - from datetime import datetime from .base import BaseExecutor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml index ff6ba6c..1e83ef9 100644 --- a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml +++ b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml @@ -100,3 +100,8 @@ interface_types: Operation to remove a target node. _extensions: relationship_edge: source + remove_source: + description: >- + Operation to remove the source node. + _extensions: + relationship_edge: target http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/end2end/test_hello_world.py ---------------------------------------------------------------------- diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py index 09e5d06..fc5f631 100644 --- a/tests/end2end/test_hello_world.py +++ b/tests/end2end/test_hello_world.py @@ -29,8 +29,7 @@ def test_hello_world(testenv): finally: # Even if some assertions failed, attempt to execute uninstall so the # webserver process doesn't stay up once the test is finished - # TODO: remove force_service_delete=True - testenv.uninstall_service(force_service_delete=True) + testenv.uninstall_service() _verify_webserver_down('http://localhost:9090') testenv.verify_clean_storage() @@ -57,5 +56,5 @@ def _verify_deployed_service_in_storage(service_name, model_storage): assert service.name == service_name assert len(service.executions) == 1 assert len(service.nodes) == 2 - # TODO: validate node states + assert all(node.state == node.STARTED for node in service.nodes.values()) assert len(service.executions[0].logs) > 0 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/end2end/testenv.py ---------------------------------------------------------------------- diff --git a/tests/end2end/testenv.py b/tests/end2end/testenv.py index 3950b20..85714e5 100644 --- a/tests/end2end/testenv.py +++ b/tests/end2end/testenv.py @@ -70,7 +70,7 @@ class TestEnvironment(object): assert len(self.model_storage.log.list()) == 0 def _get_cli(self): - cli = sh.aria.bake(_out=sys.stdout.write, _err=sys.stderr.write) + cli = sh.aria.bake('-vvv', _out=sys.stdout.write, _err=sys.stderr.write) # the `sh` library supports underscore-dash auto-replacement for commands and option flags # yet not for subcommands (e.g. `aria service-templates`); The following class fixes this. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index c399474..971e0db 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -84,10 +84,10 @@ def test_node_operation_task_execution(ctx, thread_executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_node( + api.task.OperationTask( + node, interface_name=interface_name, operation_name=operation_name, - node=node, inputs=inputs ) ) @@ -141,8 +141,8 @@ def test_relationship_operation_task_execution(ctx, thread_executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_relationship( - relationship=relationship, + api.task.OperationTask( + relationship, interface_name=interface_name, operation_name=operation_name, inputs=inputs @@ -209,9 +209,10 @@ def test_invalid_task_operation_id(ctx, thread_executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name) + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name) ) execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) @@ -250,10 +251,11 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @workflow def basic_workflow(graph, **_): - graph.add_tasks(api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name, - inputs=inputs)) + graph.add_tasks(api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + inputs=inputs)) execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id), @@ -298,10 +300,10 @@ def test_node_operation_logging(ctx, executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_node( + api.task.OperationTask( + node, interface_name=interface_name, operation_name=operation_name, - node=node, inputs=inputs ) ) @@ -331,10 +333,10 @@ def test_relationship_operation_logging(ctx, executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_relationship( + api.task.OperationTask( + relationship, interface_name=interface_name, operation_name=operation_name, - relationship=relationship, inputs=inputs ) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index f4acc36..8a5db6f 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -51,7 +51,7 @@ def _mock_workflow(ctx, graph): plugin=plugin) ) node.interfaces[interface.name] = interface - task = api.task.OperationTask.for_node(node=node, interface_name='test', operation_name='op') + task = api.task.OperationTask(node, interface_name='test', operation_name='op') graph.add_tasks(task) return graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py index 213d964..ecc3ac2 100644 --- a/tests/orchestrator/context/test_toolbelt.py +++ b/tests/orchestrator/context/test_toolbelt.py @@ -90,8 +90,8 @@ def test_host_ip(workflow_context, executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_node( - node=dependency_node, + api.task.OperationTask( + dependency_node, interface_name=interface_name, operation_name=operation_name, inputs=inputs @@ -121,8 +121,8 @@ def test_relationship_tool_belt(workflow_context, executor): @workflow def basic_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_relationship( - relationship=relationship, + api.task.OperationTask( + relationship, interface_name=interface_name, operation_name=operation_name, inputs=inputs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 58506ba..09d0499 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -489,8 +489,8 @@ if __name__ == '__main__': inputs=inputs) ) node.interfaces[interface.name] = interface - graph.add_tasks(api.task.OperationTask.for_node( - node=node, + graph.add_tasks(api.task.OperationTask( + node, interface_name='test', operation_name='op', inputs=inputs)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index a75d59a..a9dc5e8 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -245,8 +245,8 @@ class TestWithActualSSHServer(object): for test_operation in test_operations: op_inputs = inputs.copy() op_inputs['test_operation'] = test_operation - ops.append(api.task.OperationTask.for_node( - node=node, + ops.append(api.task.OperationTask( + node, interface_name='test', operation_name='op', inputs=op_inputs)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py index ab62361..642c785 100644 --- a/tests/orchestrator/workflows/api/test_task.py +++ b/tests/orchestrator/workflows/api/test_task.py @@ -62,8 +62,8 @@ class TestOperationTask(object): ignore_failure = True with context.workflow.current.push(ctx): - api_task = api.task.OperationTask.for_node( - node=node, + api_task = api.task.OperationTask( + node, interface_name=interface_name, operation_name=operation_name, inputs=inputs, @@ -109,8 +109,8 @@ class TestOperationTask(object): retry_interval = 10 with context.workflow.current.push(ctx): - api_task = api.task.OperationTask.for_relationship( - relationship=relationship, + api_task = api.task.OperationTask( + relationship, interface_name=interface_name, operation_name=operation_name, inputs=inputs, @@ -154,8 +154,8 @@ class TestOperationTask(object): retry_interval = 10 with context.workflow.current.push(ctx): - api_task = api.task.OperationTask.for_relationship( - relationship=relationship, + api_task = api.task.OperationTask( + relationship, interface_name=interface_name, operation_name=operation_name, inputs=inputs, @@ -193,8 +193,8 @@ class TestOperationTask(object): dependency_node.interfaces[interface_name] = interface with context.workflow.current.push(ctx): - task = api.task.OperationTask.for_node( - node=dependency_node, + task = api.task.OperationTask( + dependency_node, interface_name=interface_name, operation_name=operation_name) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index af9af17..8c0705b 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -55,34 +55,32 @@ class BaseTest(object): tasks_graph=graph) @staticmethod - def _op(func, ctx, + def _op(ctx, + func, inputs=None, max_attempts=None, retry_interval=None, ignore_failure=None): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - + interface_name = 'aria.interfaces.lifecycle' operation_kwargs = dict(implementation='{name}.{func.__name__}'.format( name=__name__, func=func)) if inputs: # the operation has to declare the inputs before those may be passed operation_kwargs['inputs'] = inputs - - interface = mock.models.create_interface( - node.service, - 'aria.interfaces.lifecycle', - 'create', - operation_kwargs=operation_kwargs - ) + operation_name = 'create' + interface = mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) node.interfaces[interface.name] = interface - return api.task.OperationTask.for_node( - node=node, + + return api.task.OperationTask( + node, interface_name='aria.interfaces.lifecycle', - operation_name='create', - inputs=inputs, + operation_name=operation_name, + inputs=inputs or {}, max_attempts=max_attempts, retry_interval=retry_interval, - ignore_failure=ignore_failure + ignore_failure=ignore_failure, ) @pytest.fixture(autouse=True) @@ -162,7 +160,7 @@ class TestEngine(BaseTest): def test_single_task_successful_execution(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - graph.add_tasks(self._op(mock_success_task, ctx)) + graph.add_tasks(self._op(ctx, func=mock_success_task)) self._execute( workflow_func=mock_workflow, workflow_context=workflow_context, @@ -174,7 +172,7 @@ class TestEngine(BaseTest): def test_single_task_failed_execution(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - graph.add_tasks(self._op(mock_failed_task, ctx)) + graph.add_tasks(self._op(ctx, func=mock_failed_task)) with pytest.raises(exceptions.ExecutorException): self._execute( workflow_func=mock_workflow, @@ -191,8 +189,8 @@ class TestEngine(BaseTest): def test_two_tasks_execution_order(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) - op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) + op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1}) + op2 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2}) graph.sequence(op1, op2) self._execute( workflow_func=mock_workflow, @@ -206,9 +204,9 @@ class TestEngine(BaseTest): def test_stub_and_subworkflow_execution(self, workflow_context, executor): @workflow def sub_workflow(ctx, graph): - op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1}) + op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1}) op2 = api.task.StubTask() - op3 = self._op(mock_ordered_task, ctx, inputs={'counter': 2}) + op3 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2}) graph.sequence(op1, op2, op3) @workflow @@ -231,7 +229,7 @@ class TestCancel(BaseTest): @workflow def mock_workflow(ctx, graph): operations = ( - self._op(mock_sleep_task, ctx, inputs=dict(seconds=0.1)) + self._op(ctx, func=mock_sleep_task, inputs=dict(seconds=0.1)) for _ in range(number_of_tasks) ) return graph.sequence(*operations) @@ -271,7 +269,7 @@ class TestRetries(BaseTest): def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_conditional_failure_task, ctx, + op = self._op(ctx, func=mock_conditional_failure_task, inputs={'failure_count': 1}, max_attempts=2) graph.add_tasks(op) @@ -287,7 +285,7 @@ class TestRetries(BaseTest): def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_conditional_failure_task, ctx, + op = self._op(ctx, func=mock_conditional_failure_task, inputs={'failure_count': 2}, max_attempts=2) graph.add_tasks(op) @@ -304,7 +302,7 @@ class TestRetries(BaseTest): def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_conditional_failure_task, ctx, + op = self._op(ctx, func=mock_conditional_failure_task, inputs={'failure_count': 1}, max_attempts=3) graph.add_tasks(op) @@ -320,7 +318,7 @@ class TestRetries(BaseTest): def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_conditional_failure_task, ctx, + op = self._op(ctx, func=mock_conditional_failure_task, inputs={'failure_count': 2}, max_attempts=3) graph.add_tasks(op) @@ -336,7 +334,7 @@ class TestRetries(BaseTest): def test_infinite_retries(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_conditional_failure_task, ctx, + op = self._op(ctx, func=mock_conditional_failure_task, inputs={'failure_count': 1}, max_attempts=-1) graph.add_tasks(op) @@ -362,7 +360,7 @@ class TestRetries(BaseTest): def _test_retry_interval(self, retry_interval, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_conditional_failure_task, ctx, + op = self._op(ctx, func=mock_conditional_failure_task, inputs={'failure_count': 1}, max_attempts=2, retry_interval=retry_interval) @@ -382,7 +380,7 @@ class TestRetries(BaseTest): def test_ignore_failure(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_conditional_failure_task, ctx, + op = self._op(ctx, func=mock_conditional_failure_task, ignore_failure=True, inputs={'failure_count': 100}, max_attempts=100) @@ -406,7 +404,7 @@ class TestTaskRetryAndAbort(BaseTest): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_task_retry, ctx, + op = self._op(ctx, func=mock_task_retry, inputs={'message': self.message}, retry_interval=default_retry_interval, max_attempts=2) @@ -430,7 +428,7 @@ class TestTaskRetryAndAbort(BaseTest): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_task_retry, ctx, + op = self._op(ctx, func=mock_task_retry, inputs={'message': self.message, 'retry_interval': custom_retry_interval}, retry_interval=default_retry_interval, @@ -453,7 +451,7 @@ class TestTaskRetryAndAbort(BaseTest): def test_task_abort(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op = self._op(mock_task_abort, ctx, + op = self._op(ctx, func=mock_task_abort, inputs={'message': self.message}, retry_interval=100, max_attempts=100) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index b9bff77..184071d 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -136,8 +136,8 @@ def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node, @workflow def single_operation_workflow(graph, node, interface_name, op_name, **_): - graph.add_tasks(api.task.OperationTask.for_node( - node=node, + graph.add_tasks(api.task.OperationTask( + node, interface_name=interface_name, operation_name=op_name)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 8dda209..ee302f4 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -66,8 +66,8 @@ class TestOperationTask(object): def _create_node_operation_task(self, ctx, node): with workflow_context.current.push(ctx): - api_task = api.task.OperationTask.for_node( - node=node, + api_task = api.task.OperationTask( + node, interface_name=NODE_INTERFACE_NAME, operation_name=NODE_OPERATION_NAME) core_task = core.task.OperationTask(api_task=api_task) @@ -75,8 +75,8 @@ class TestOperationTask(object): def _create_relationship_operation_task(self, ctx, relationship): with workflow_context.current.push(ctx): - api_task = api.task.OperationTask.for_relationship( - relationship=relationship, + api_task = api.task.OperationTask( + relationship, interface_name=RELATIONSHIP_INTERFACE_NAME, operation_name=RELATIONSHIP_OPERATION_NAME) core_task = core.task.OperationTask(api_task=api_task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 514bce9..2a96d01 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -17,6 +17,7 @@ from networkx import topological_sort, DiGraph from aria.orchestrator import context from aria.orchestrator.workflows import api, core +from aria.orchestrator.workflows.executor import base from tests import mock from tests import storage @@ -41,17 +42,20 @@ def test_task_graph_into_execution_graph(tmpdir): with context.workflow.current.push(task_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') - simple_before_task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name) - simple_after_task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name) + simple_before_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name) + simple_after_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name) inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph') - inner_task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name) + inner_task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name) inner_task_graph.add_tasks(inner_task) test_task_graph.add_tasks(simple_before_task) @@ -63,7 +67,8 @@ def test_task_graph_into_execution_graph(tmpdir): # Direct check execution_graph = DiGraph() core.translation.build_execution_graph(task_graph=test_task_graph, - execution_graph=execution_graph) + execution_graph=execution_graph, + default_executor=base.StubTaskExecutor()) execution_tasks = topological_sort(execution_graph) assert len(execution_tasks) == 7 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index 88e7ae0..1dbfae1 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -99,14 +99,16 @@ def _test(context, executor, lock_files, func, expected_failure): @workflow def mock_workflow(graph, **_): graph.add_tasks( - api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name, - inputs=inputs), - api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name, - inputs=inputs) + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + inputs=inputs), + api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + inputs=inputs) ) signal = events.on_failure_task_signal http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 7ae337d..878ac24 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -46,10 +46,11 @@ def test_decorate_extension(context, executor): inputs=inputs) ) node.interfaces[interface.name] = interface - task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name, - inputs=inputs) + task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + inputs=inputs) graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 3a8c54b..4fbe9c1 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -99,10 +99,11 @@ def _run_workflow(context, executor, op_func, inputs=None): inputs=wf_inputs) ) node.interfaces[interface.name] = interface - task = api.task.OperationTask.for_node(node=node, - interface_name=interface_name, - operation_name=operation_name, - inputs=wf_inputs) + task = api.task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + inputs=wf_inputs) graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b22c3464/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py ---------------------------------------------------------------------- diff --git a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py index abe1ee2..06e4f9e 100644 --- a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py +++ b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py @@ -1,5 +1,5 @@ from aria import workflow -from aria.orchestrator.workflows.builtin import utils +from aria.orchestrator.workflows.api import task from aria.orchestrator.workflows.exceptions import TaskException @@ -16,9 +16,9 @@ def maintenance(ctx, graph, enabled): for node in ctx.model.node.iter(): try: - graph.add_tasks(utils.create_node_task(node=node, - interface_name=INTERFACE_NAME, - operation_name=ENABLE_OPERATION_NAME if enabled - else DISABLE_OPERATION_NAME)) + graph.add_tasks(task.OperationTask(node, + interface_name=INTERFACE_NAME, + operation_name=ENABLE_OPERATION_NAME if enabled + else DISABLE_OPERATION_NAME)) except TaskException: pass