Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks 5fce85b12 -> e758e86ad (forced update)
shited stuff around Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e758e86a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e758e86a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e758e86a Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: e758e86ade7741b654740b073938d30cf063985e Parents: d1cfd26 Author: max-orlov <ma...@gigaspaces.com> Authored: Thu Jun 15 11:03:15 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Jun 15 11:09:57 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 3 +- aria/orchestrator/context/operation.py | 8 ++++ aria/orchestrator/workflow_runner.py | 28 +++++------ aria/orchestrator/workflows/executor/base.py | 48 +++++++++---------- aria/orchestrator/workflows/executor/dry.py | 51 ++++++++++----------- tests/orchestrator/workflows/core/test_task.py | 5 +- 6 files changed, 72 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e758e86a/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 6f69483..11a4684 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -452,7 +452,8 @@ class TaskBase(mixins.ModelMixin): 'api_id': api_task.id, '_context_cls': context_cls, '_executor': executor, - }) + } + ) instantiation_kwargs.update(**kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e758e86a/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 7477912..7591d70 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -18,6 +18,7 @@ Workflow and operation contexts """ import threading +from contextlib import contextmanager import aria from aria.utils import file @@ -109,6 +110,13 @@ class BaseOperationContext(common.BaseContext): self.model.log._session.remove() self.model.log._engine.dispose() + @property + @contextmanager + def track_task(self): + self.model.task.update(self.task) + yield + self.model.task.update(self.task) + class NodeOperationContext(BaseOperationContext): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e758e86a/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index f0a48ad..422066c 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -29,7 +29,7 @@ from .workflows import builtin from .workflows.core.engine import Engine from .workflows.executor.process import ProcessExecutor from .workflows.executor.base import StubTaskExecutor -from .workflows.api import task +from .workflows.api import task as api_task from ..modeling import models from ..modeling import utils as modeling_utils from ..utils.imports import import_fullname @@ -180,9 +180,9 @@ class WorkflowRunner(object): def get_execution_graph(execution): graph = DiGraph() - for t in execution.tasks: - for dependency in t.dependencies: - graph.add_edge(dependency, t) + for task in execution.tasks: + for dependency in task.dependencies: + graph.add_edge(dependency, task) return graph @@ -210,28 +210,28 @@ def construct_execution_tasks(execution, stub_type=start_stub_type, dependencies=depends_on) - for api_task in task_graph.topological_order(reverse=True): + for task in task_graph.topological_order(reverse=True): operation_dependencies = _get_tasks_from_dependencies( - execution, task_graph.get_dependencies(api_task), [start_task]) + execution, task_graph.get_dependencies(task), [start_task]) - if isinstance(api_task, task.OperationTask): - models.Task.from_api_task(api_task=api_task, + if isinstance(task, api_task.OperationTask): + models.Task.from_api_task(api_task=task, executor=default_executor, dependencies=operation_dependencies) - elif isinstance(api_task, task.WorkflowTask): + elif isinstance(task, api_task.WorkflowTask): # Build the graph recursively while adding start and end markers construct_execution_tasks( execution=execution, - task_graph=api_task, + task_graph=task, default_executor=default_executor, stub_executor=stub_executor, start_stub_type=models.Task.START_SUBWROFKLOW, end_stub_type=models.Task.END_SUBWORKFLOW, depends_on=operation_dependencies ) - elif isinstance(api_task, task.StubTask): - models.Task(api_id=api_task.id, + elif isinstance(task, api_task.StubTask): + models.Task(api_id=task.id, _executor=stub_executor, execution=execution, stub_type=models.Task.STUB, @@ -257,8 +257,8 @@ def _end_graph_suffix(api_id): def _get_non_dependent_tasks(execution): dependency_tasks = set() - for t in execution.tasks: - dependency_tasks.update(t.dependencies) + for task in execution.tasks: + dependency_tasks.update(task.dependencies) return list(set(execution.tasks) - set(dependency_tasks)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e758e86a/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index a727e5c..a1cfe4b 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -21,15 +21,6 @@ from aria import logger from aria.orchestrator import events -def update_ctx(func): - def _wrapper(self, ctx, *args, **kwargs): - ctx.update_task() - func(self, ctx, *args, **kwargs) - ctx.update_task() - - return _wrapper - - class BaseExecutor(logger.LoggerMixin): """ Base class for executors for running tasks @@ -37,20 +28,20 @@ class BaseExecutor(logger.LoggerMixin): def _execute(self, task): raise NotImplementedError - @update_ctx def execute(self, ctx): """ Execute a task :param task: task to execute """ - if ctx.task.function: - self._execute(ctx) - else: - # In this case the task is missing a function. This task still gets to an - # executor, but since there is nothing to run, we by default simply skip the execution - # itself. - self._task_started(ctx) - self._task_succeeded(ctx) + with ctx.track_task: + if ctx.task.function: + self._execute(ctx) + else: + # In this case the task is missing a function. This task still gets to an + # executor, but since there is nothing to run, we by default simply skip the + # execution itself. + self._task_started(ctx) + self._task_succeeded(ctx) def close(self): """ @@ -58,17 +49,20 @@ class BaseExecutor(logger.LoggerMixin): """ pass - @update_ctx - def _task_started(self, ctx): - events.start_task_signal.send(ctx) + @staticmethod + def _task_started(ctx): + with ctx.track_task: + events.start_task_signal.send(ctx) - @update_ctx - def _task_failed(self, ctx, exception, traceback=None): - events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) + @staticmethod + def _task_failed(ctx, exception, traceback=None): + with ctx.track_task: + events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) - @update_ctx - def _task_succeeded(self, ctx): - events.on_success_task_signal.send(ctx) + @staticmethod + def _task_succeeded(ctx): + with ctx.track_task: + events.on_success_task_signal.send(ctx) class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e758e86a/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index c12ba7c..a5f8507 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -25,30 +25,29 @@ class DryExecutor(base.BaseExecutor): """ Executor which dry runs tasks - prints task information without causing any side effects """ - @base.update_ctx def execute(self, ctx): - # updating the task manually instead of calling self._task_started(task), - # to avoid any side effects raising that event might cause - ctx.task.started_at = datetime.utcnow() - ctx.task.status = ctx.task.STARTED - - dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}' - logger = ctx.logger.info if ctx.task.function else ctx.logger.debug - - if hasattr(ctx.task.actor, 'source_node'): - name = '{source_node.name}->{target_node.name}'.format( - source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node) - else: - name = ctx.task.actor.name - - if ctx.task.function: - logger(dry_msg.format(name=name, task=ctx.task, suffix='started...')) - logger(dry_msg.format(name=name, task=ctx.task, suffix='successful')) - else: - logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation')) - - # updating the task manually instead of calling self._task_succeeded(task), - # to avoid any side effects raising that event might cause - ctx.task.ended_at = datetime.utcnow() - ctx.task.status = ctx.task.SUCCESS - ctx.update_task() + with ctx.track_task: + # updating the task manually instead of calling self._task_started(task), + # to avoid any side effects raising that event might cause + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + + dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}' + logger = ctx.logger.info if ctx.task.function else ctx.logger.debug + + if hasattr(ctx.task.actor, 'source_node'): + name = '{source_node.name}->{target_node.name}'.format( + source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node) + else: + name = ctx.task.actor.name + + if ctx.task.function: + logger(dry_msg.format(name=name, task=ctx.task, suffix='started...')) + logger(dry_msg.format(name=name, task=ctx.task, suffix='successful')) + else: + logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation')) + + # updating the task manually instead of calling self._task_succeeded(task), + # to avoid any side effects raising that event might cause + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e758e86a/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index f0f3a3b..2b3f7d7 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -22,7 +22,6 @@ import pytest from aria.orchestrator.context import workflow as workflow_context from aria.orchestrator.workflows import ( api, - core, exceptions, ) from aria.modeling import models @@ -107,9 +106,9 @@ class TestOperationTask(object): def test_relationship_operation_task_creation(self, ctx): relationship = ctx.model.relationship.list()[0] ctx.model.relationship.update(relationship) - _, model_Task = self._create_relationship_operation_task( + _, model_task = self._create_relationship_operation_task( ctx, relationship) - assert model_Task.actor == relationship + assert model_task.actor == relationship @pytest.mark.skip("Currently not supported for model tasks") def test_operation_task_edit_locked_attribute(self, ctx):