Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks 092b45f0d -> 58e212c7d
tasks no longer execute their executor Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/58e212c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/58e212c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/58e212c7 Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 58e212c7dbdea7aa9b532bba9fea4a5359a49019 Parents: 092b45f Author: max-orlov <ma...@gigaspaces.com> Authored: Wed Jun 14 17:17:16 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed Jun 14 17:17:16 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 36 ++---- aria/orchestrator/context/operation.py | 3 + aria/orchestrator/workflow_runner.py | 123 +++++++++++++++++-- aria/orchestrator/workflows/core/__init__.py | 2 +- aria/orchestrator/workflows/core/engine.py | 60 +++++---- .../workflows/core/events_handler.py | 2 +- aria/orchestrator/workflows/core/translation.py | 112 ----------------- aria/orchestrator/workflows/executor/base.py | 13 +- aria/orchestrator/workflows/executor/thread.py | 2 - tests/orchestrator/context/__init__.py | 11 +- tests/orchestrator/context/test_operation.py | 20 ++- tests/orchestrator/context/test_serialize.py | 15 ++- tests/orchestrator/context/test_toolbelt.py | 7 +- .../orchestrator/execution_plugin/test_local.py | 15 ++- tests/orchestrator/test_workflow_runner.py | 2 +- .../orchestrator/workflows/core/test_engine.py | 21 ++-- .../orchestrator/workflows/core/test_events.py | 12 +- .../test_task_graph_into_execution_graph.py | 48 +++----- .../workflows/executor/test_executor.py | 39 +++--- 19 files changed, 271 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 4a5771a..c0b7f04 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -24,6 +24,7 @@ classes: from contextlib import contextmanager from datetime import datetime +from networkx import DiGraph from sqlalchemy import ( Column, Integer, @@ -309,7 +310,6 @@ class TaskBase(mixins.ModelMixin): api_id = Column(String) _executor = Column(PickleType) - _executor_kwargs = Column(modeling_types.Dict) _context_cls = Column(PickleType) @declared_attr @@ -400,14 +400,14 @@ class TaskBase(mixins.ModelMixin): def retry(message=None, retry_interval=None): raise TaskRetryException(message, retry_interval=retry_interval) - # @declared_attr - # def dependency_fk(cls): - # """For Type one-to-many to Type""" - # return relationship.foreign_key('task', nullable=True) + @declared_attr + def dependency_fk(self): + return relationship.foreign_key('task', nullable=True) @declared_attr - def dependent_tasks(cls): - return relationship.many_to_many(cls, 'task', 'dependent', other_property='dependencies') + def dependencies(cls): + # symmetric relationship causes funky graphs + return relationship.one_to_many_self(cls, 'dependency_fk') def has_ended(self): if self.stub_type is not None: @@ -422,7 +422,7 @@ class TaskBase(mixins.ModelMixin): return self.status in (self.PENDING, self.RETRYING) @classmethod - def from_api_task(cls, api_task, executor, executor_kwargs=None, **kwargs): + def from_api_task(cls, api_task, executor, **kwargs): from aria.orchestrator import context instantiation_kwargs = {} @@ -454,32 +454,12 @@ class TaskBase(mixins.ModelMixin): 'api_id': api_task.id, '_context_cls': context_cls, '_executor': executor, - '_executor_kwargs': executor_kwargs or {} }) instantiation_kwargs.update(**kwargs) return cls(**instantiation_kwargs) - def execute(self, ctx, executor_kwargs=None): - from aria.orchestrator.context import operation - context_cls = self._context_cls or operation.BaseOperationContext - op_ctx = context_cls( - model_storage=ctx.model, - resource_storage=ctx.resource, - workdir=ctx._workdir, - task_id=self.id, - actor_id=self.actor.id if self.actor else None, - service_id=self.execution.service.id, - execution_id=self.execution.id, - name=self.name - ) - executor = self._executor(**dict(self._executor_kwargs or {}, **(executor_kwargs or {}))) - try: - return executor.execute(op_ctx) - except BaseException: - executor.close() - class LogBase(mixins.ModelMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index af7220d..496739f 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -65,6 +65,9 @@ class BaseOperationContext(common.BaseContext): self._thread_local.task = self.model.task.get(self._task_id) return self._thread_local.task + def update_task(self): + self.model.task.update(self.task) + @property def plugin_workdir(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 961796a..f09cb79 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -21,11 +21,15 @@ import os import sys from datetime import datetime +from networkx import DiGraph + from . import exceptions from .context.workflow import WorkflowContext from .workflows import builtin from .workflows.core.engine import Engine from .workflows.executor.process import ProcessExecutor +from .workflows.executor.base import StubTaskExecutor +from .workflows.api import task from ..modeling import models from ..modeling import utils as modeling_utils from ..utils.imports import import_fullname @@ -39,7 +43,7 @@ class WorkflowRunner(object): def __init__(self, workflow_name, service_id, inputs, model_storage, resource_storage, plugin_manager, - executor=None, executor_kwargs=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, + executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ Manages a single workflow execution on a given service. @@ -80,20 +84,21 @@ class WorkflowRunner(object): task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval) + # Set default executor and kwargs + executor = executor or ProcessExecutor(plugin_manager=plugin_manager) + # transforming the execution inputs to dict, to pass them to the workflow function execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) + self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict) + construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__) - # Set default executor and kwargs - if executor is None: - executor = ProcessExecutor - executor_kwargs = dict(plugin_manager=plugin_manager) + # Update the state + self._model_storage.execution.update(execution) - self._engine = Engine( - executor=executor, - executor_kwargs=executor_kwargs, - workflow_context=workflow_context, - tasks_graph=self._tasks_graph) + self._engine = Engine(executor=executor, + workflow_context=workflow_context, + execution_graph=get_execution_graph(self.execution)) @property def execution_id(self): @@ -171,3 +176,101 @@ class WorkflowRunner(object): self._workflow_name, workflow.function)) return workflow_fn + + +def get_execution_graph(execution): + graph = DiGraph() + for task in execution.tasks: + for dependency in task.dependencies: + graph.add_edge(dependency, task) + + return graph + + +def construct_execution_tasks(execution, + task_graph, + default_executor, + stub_executor=StubTaskExecutor, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + depends_on = list(depends_on) + + # Insert start marker + start_task = models.Task(api_id=_start_graph_suffix(task_graph.id), + _executor=stub_executor, + execution=execution, + stub_type=start_stub_type, + dependencies=depends_on) + + for api_task in task_graph.topological_order(reverse=True): + operation_dependencies = _get_tasks_from_dependencies(execution, + task_graph.get_dependencies(api_task), [start_task]) + + if isinstance(api_task, task.OperationTask): + models.Task.from_api_task(api_task=api_task, + executor=default_executor, + dependencies=operation_dependencies) + + elif isinstance(api_task, task.WorkflowTask): + # Build the graph recursively while adding start and end markers + construct_execution_tasks( + execution=execution, + task_graph=api_task, + default_executor=default_executor, + stub_executor=stub_executor, + start_stub_type=models.Task.START_SUBWROFKLOW, + end_stub_type=models.Task.END_SUBWORKFLOW, + depends_on=operation_dependencies + ) + elif isinstance(api_task, task.StubTask): + models.Task(api_id=api_task.id, + _executor=stub_executor, + execution=execution, + stub_type=models.Task.STUB, + dependencies=operation_dependencies) + else: + raise + + # Insert end marker + models.Task(api_id=_end_graph_suffix(task_graph.id), + _executor=stub_executor, + execution=execution, + stub_type=end_stub_type, + dependencies=_get_non_dependent_tasks(execution) or [start_task]) + + +def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + +def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + +def _get_non_dependent_tasks(execution): + dependency_tasks = set() + for task in execution.tasks: + dependency_tasks.update(task.dependencies) + return list(set(execution.tasks) - set(dependency_tasks)) + + +def _get_tasks_from_dependencies(execution, dependencies, default=()): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if getattr(dependency, 'actor', False): + dependency_name = dependency.id + else: + dependency_name = _end_graph_suffix(dependency.id) + tasks.extend(task for task in execution.tasks if task.api_id == dependency_name) + return tasks or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/__init__.py b/aria/orchestrator/workflows/core/__init__.py index 98938be..81db43f 100644 --- a/aria/orchestrator/workflows/core/__init__.py +++ b/aria/orchestrator/workflows/core/__init__.py @@ -17,4 +17,4 @@ Core for the workflow execution mechanism """ -from . import translation, engine +from . import engine http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 67e8c1d..e1b6412 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -20,14 +20,12 @@ The workflow engine. Executes workflows import time from datetime import datetime -import networkx - from aria import logger from aria.modeling import models from aria.orchestrator import events +from aria.orchestrator.context import operation from .. import exceptions -from . import translation # Import required so all signals are registered from . import events_handler # pylint: disable=unused-import @@ -37,18 +35,11 @@ class Engine(logger.LoggerMixin): The workflow engine. Executes workflows """ - def __init__(self, executor, workflow_context, tasks_graph, executor_kwargs=None, **kwargs): + def __init__(self, executor, workflow_context, execution_graph, **kwargs): super(Engine, self).__init__(**kwargs) self._workflow_context = workflow_context - self._execution_graph = networkx.DiGraph() - self._executor_kwargs = executor_kwargs - translation.store_tasks(task_graph=tasks_graph, - execution_graph=self._execution_graph, - default_executor=executor, - execution=workflow_context.execution) - - # Flush changes - workflow_context.model.execution.update(workflow_context.execution) + self._executors = {executor.__class__: executor} + self._execution_graph = execution_graph def execute(self): """ @@ -83,38 +74,57 @@ class Engine(logger.LoggerMixin): will be modified to 'cancelled' directly. """ events.on_cancelling_workflow_signal.send(self._workflow_context) + self._workflow_context.execution = self._workflow_context.execution def _is_cancel(self): - return self._workflow_context.execution.status in (models.Execution.CANCELLING, - models.Execution.CANCELLED) + execution = self._workflow_context.model.execution.update(self._workflow_context.execution) + return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) def _executable_tasks(self): now = datetime.utcnow() - return (task for task in self._tasks_iter() - if task.is_waiting() and - task.due_at <= now and - not self._task_has_dependencies(task)) + return ( + task for task in self._tasks_iter() + if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task) + ) def _ended_tasks(self): - return (task for task in self._tasks_iter() if task.has_ended()) + return (task for task in self._tasks_iter() + if task.has_ended() and task in self._execution_graph) def _task_has_dependencies(self, task): - return len(self._execution_graph.pred.get(task.api_id, {})) > 0 + return task.dependencies and all(d in self._execution_graph for d in task.dependencies) def _all_tasks_consumed(self): return len(self._execution_graph.node) == 0 def _tasks_iter(self): - for _, data in self._execution_graph.nodes_iter(data=True): - yield self._workflow_context.model.task.get(data['task'].id) + for task in self._workflow_context.execution.tasks: + yield self._workflow_context.model.task.refresh(task) def _handle_executable_task(self, task): if not task.stub_type: events.sent_task_signal.send(task) - task.execute(self._workflow_context, self._executor_kwargs) + + if task._executor not in self._executors: + self._executors[task._executor] = task._executor() + executor = self._executors[task._executor] + + context_cls = task._context_cls or operation.BaseOperationContext + op_ctx = context_cls( + model_storage=self._workflow_context.model, + resource_storage=self._workflow_context.resource, + workdir=self._workflow_context._workdir, + task_id=task.id, + actor_id=task.actor.id if task.actor else None, + service_id=task.execution.service.id, + execution_id=task.execution.id, + name=task.name + ) + + executor.execute(op_ctx) def _handle_ended_tasks(self, task): if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: - self._execution_graph.remove_node(task.api_id) + self._execution_graph.remove_node(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 5d979c4..8b217f5 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -47,7 +47,7 @@ def _task_failed(ctx, exception, *args, **kwargs): not isinstance(exception, exceptions.TaskAbortException), ctx.task.attempts_count < ctx.task.max_attempts or ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, - # ignore_failure check here means the task will not be retries and it will be marked + # ignore_failure check here means the task will not be retried and it will be marked # as failed. The engine will also look at ignore_failure so it won't fail the # workflow. not ctx.task.ignore_failure http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py deleted file mode 100644 index 8da9bd7..0000000 --- a/aria/orchestrator/workflows/core/translation.py +++ /dev/null @@ -1,112 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Translation of user graph's API to the execution graph -""" - -from ....modeling import models -from .. import api -from ..executor import base - - -def store_tasks(ctx, task_graph, default_executor, execution, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param workflow_context: The workflow - :param execution_graph: The execution graph that is being built - :param start_stub_type: internal use - :param end_stub_type: internal use - :param depends_on: internal use - """ - depends_on = list(depends_on) - - # Insert start marker - start_task = models.Task(api_id=_start_graph_suffix(task_graph.id), - _executor=base.StubTaskExecutor, - execution=execution, - stub_type=start_stub_type, - dependencies=depends_on) - - for api_task in task_graph.topological_order(reverse=True): - dependencies = task_graph.get_dependencies(api_task) - operation_dependencies = _get_tasks_from_dependencies(ctx, dependencies, [start_task]) - - if isinstance(api_task, api.task.OperationTask): - models.Task.from_api_task( - api_task=api_task, executor=default_executor, dependencies=operation_dependencies) - - elif isinstance(api_task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - store_tasks( - ctx=ctx, - task_graph=api_task, - default_executor=default_executor, - execution=execution, - start_stub_type=models.Task.START_SUBWROFKLOW, - end_stub_type=models.Task.END_SUBWORKFLOW, - depends_on=operation_dependencies - ) - elif isinstance(api_task, api.task.StubTask): - models.Task(api_id=api_task.id, - _executor=base.StubTaskExecutor, - execution=execution, - stub_type=models.Task.STUB, - dependencies=operation_dependencies) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - workflow_dependencies = [task for task in ctx.model.task.list() if not task.dependent_tasks] - models.Task(api_id=_end_graph_suffix(task_graph.id), - _executor=base.StubTaskExecutor, - execution=execution, - stub_type=end_stub_type, - dependencies=workflow_dependencies) - - ctx.model.execution.update(execution) - -def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - -def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - -def construct_graph(graph, execution): - for task in execution.tasks: - for dependency in task.dependencies: - graph.add_edge(dependency, task) - - return graph - - -def _get_tasks_from_dependencies(ctx, dependencies, default=()): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)): - dependency_name = dependency.id - else: - dependency_name = _end_graph_suffix(dependency.id) - tasks.extend(list(ctx.model.task.list(filters={'name': dependency_name}))) - return tasks or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 54a9438..fc4b800 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -33,9 +33,9 @@ class BaseExecutor(logger.LoggerMixin): Execute a task :param task: task to execute """ + ctx.update_task() if ctx.task.function: self._execute(ctx) - ctx.model.task.update(ctx.task) else: # In this case the task is missing a function. This task still gets to an # executor, but since there is nothing to run, we by default simply skip the execution @@ -52,22 +52,17 @@ class BaseExecutor(logger.LoggerMixin): @staticmethod def _task_started(ctx): events.start_task_signal.send(ctx) - ctx.model.task.update(ctx.task) + ctx.update_task() def _task_failed(self, ctx, exception, traceback=None): events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) - ctx.model.task.update(ctx.task) + ctx.update_task() def _task_succeeded(self, ctx): events.on_success_task_signal.send(ctx) - ctx.model.task.update(ctx.task) + ctx.update_task() class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method - - def __init__(self, *args, **kwargs): - # TODO: the executor kwargs delivery system is bad, so we need to aps the kwargs each time (and they are not persisted - this is bad!) - super(StubTaskExecutor, self).__init__() - def execute(self, ctx, *args, **kwargs): ctx.task.status = ctx.task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 074b54b..8c447b6 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -68,8 +68,6 @@ class ThreadExecutor(BaseExecutor): self._task_failed(ctx, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) - finally: - break # Daemon threads except BaseException as e: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 89a7b2c..cb282a3 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -16,6 +16,7 @@ import sys from aria.orchestrator.workflows.core import engine +from aria.orchestrator import workflow_runner def op_path(func, module_path=None): @@ -23,8 +24,12 @@ def op_path(func, module_path=None): return '{0}.{1}'.format(module_path, func.__name__) -def execute(workflow_func, workflow_context, executor, executor_kwargs=None): +def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - eng = engine.Engine(executor=executor, executor_kwargs=executor_kwargs, - workflow_context=workflow_context, tasks_graph=graph) + + workflow_runner.construct_execution_tasks(workflow_context.execution, graph, executor.__class__) + workflow_context.execution = workflow_context.execution + execution_graph = workflow_runner.get_execution_graph(workflow_context.execution) + eng = engine.Engine(executor, workflow_context, execution_graph) + eng.execute() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index e8f1a63..f654fe5 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -51,7 +51,11 @@ def ctx(tmpdir): @pytest.fixture def thread_executor(): - return thread.ThreadExecutor + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() @pytest.fixture @@ -253,7 +257,12 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): - return request.param + ex_cls, kwargs = request.param + ex = ex_cls(**kwargs) + try: + yield ex + finally: + ex.close() def test_node_operation_logging(ctx, executor): @@ -286,8 +295,7 @@ def test_node_operation_logging(ctx, executor): arguments=arguments ) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], - executor_kwargs=executor[1]) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) _assert_loggins(ctx, arguments) @@ -320,7 +328,7 @@ def test_relationship_operation_logging(ctx, executor): ) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1]) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) _assert_loggins(ctx, arguments) @@ -377,7 +385,7 @@ def test_attribute_consumption(ctx, executor, dataholder): ) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], executor_kwargs=executor[1]) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) assert len(source_node.attributes) == len(target_node.attributes) == 2 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index f7e441e..4975d20 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -18,7 +18,7 @@ import pytest from aria.orchestrator.workflows import api from aria.orchestrator.workflows.core import engine from aria.orchestrator.workflows.executor import process -from aria.orchestrator import workflow, operation +from aria.orchestrator import workflow, operation, workflow_runner import tests from tests import mock from tests import storage @@ -28,14 +28,16 @@ TEST_FILE_ENTRY_ID = 'entry' TEST_FILE_NAME = 'test_file' -def test_serialize_operation_context(context, executor, executor_kwargs, tmpdir): +def test_serialize_operation_context(context, executor, tmpdir): test_file = tmpdir.join(TEST_FILE_NAME) test_file.write(TEST_FILE_CONTENT) resource = context.resource resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file)) graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph, - executor_kwargs=executor_kwargs) + workflow_runner.construct_execution_tasks(context.execution, graph, executor.__class__) + context.execution = context.execution + execution_graph = workflow_runner.get_execution_graph(context.execution) + eng = engine.Engine(executor, context, execution_graph) eng.execute() @@ -83,8 +85,9 @@ def _operation_mapping(): @pytest.fixture def executor(): - return process.ProcessExecutor - + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + yield result + result.close() @pytest.fixture def executor_kwargs(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py index df54d9d..4de9e55 100644 --- a/tests/orchestrator/context/test_toolbelt.py +++ b/tests/orchestrator/context/test_toolbelt.py @@ -41,7 +41,12 @@ def workflow_context(tmpdir): @pytest.fixture def executor(): - return thread.ThreadExecutor + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + @pytest.fixture def dataholder(tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 8e8c6e0..8414240 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -19,7 +19,7 @@ import os import pytest from aria import workflow -from aria.orchestrator import events +from aria.orchestrator import events, workflow_runner from aria.orchestrator.workflows import api from aria.orchestrator.workflows.exceptions import ExecutorException from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException @@ -498,17 +498,20 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - eng = engine.Engine( - executor=executor, - workflow_context=workflow_context, - tasks_graph=tasks_graph) + workflow_runner.construct_execution_tasks( + workflow_context.execution, tasks_graph, executor.__class__) + workflow_context.execution = workflow_context.execution + execution_graph = workflow_runner.get_execution_graph(workflow_context.execution) + eng = engine.Engine(executor, workflow_context, execution_graph) eng.execute() return workflow_context.model.node.get_by_name( mock.models.DEPENDENCY_NODE_NAME).attributes @pytest.fixture def executor(self): - return process.ProcessExecutor + result = process.ProcessExecutor() + yield result + result.close() @pytest.fixture def workflow_context(self, tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index baf45d7..c5a62ae 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -99,7 +99,7 @@ def test_default_executor(request): with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: _create_workflow_runner(request, mock_workflow) _, engine_kwargs = mock_engine_cls.call_args - assert engine_kwargs.get('executor') == ProcessExecutor + assert isinstance(engine_kwargs.get('executor'), ProcessExecutor) def test_custom_executor(request): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 7ffb92a..108360f 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -24,6 +24,7 @@ from aria.orchestrator import ( operation, ) from aria.modeling import models +from aria.orchestrator import workflow_runner from aria.orchestrator.workflows import ( api, exceptions, @@ -40,21 +41,23 @@ global_test_holder = {} class BaseTest(object): @classmethod - def _execute(cls, workflow_func, workflow_context, executor, executor_kwargs=None): + def _execute(cls, workflow_func, workflow_context, executor): eng = cls._engine(workflow_func=workflow_func, workflow_context=workflow_context, - executor=executor, - executor_kwargs=executor_kwargs) + executor=executor) eng.execute() return eng @staticmethod - def _engine(workflow_func, workflow_context, executor, executor_kwargs=None): + def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) + execution = workflow_context.execution + workflow_runner.construct_execution_tasks(execution, graph, executor.__class__) + workflow_context.execution = execution + return engine.Engine(executor=executor, - executor_kwargs=executor_kwargs, workflow_context=workflow_context, - tasks_graph=graph) + execution_graph=workflow_runner.get_execution_graph(execution)) @staticmethod def _op(ctx, @@ -128,7 +131,11 @@ class BaseTest(object): @pytest.fixture def executor(self): - return thread.ThreadExecutor + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() @pytest.fixture def workflow_context(self, tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index d63a8ef..92582a9 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -15,6 +15,7 @@ import pytest +from aria.orchestrator import workflow_runner from tests import mock, storage from aria.modeling.service_instance import NodeBase from aria.orchestrator.decorators import operation, workflow @@ -112,13 +113,14 @@ def run_operation_on_node(ctx, op_name, interface_name): operation_name=op_name, operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) node.interfaces[interface.name] = interface + workflow_runner.construct_execution_tasks( + ctx.execution, single_operation_workflow( + ctx=ctx, node=node, interface_name=interface_name, op_name=op_name), ThreadExecutor) + ctx.execution = ctx.execution - eng = engine.Engine(executor=ThreadExecutor, + eng = engine.Engine(executor=ThreadExecutor(), workflow_context=ctx, - tasks_graph=single_operation_workflow(ctx=ctx, # pylint: disable=no-value-for-parameter - node=node, - interface_name=interface_name, - op_name=op_name)) + execution_graph=workflow_runner.get_execution_graph(ctx.execution)) eng.execute() return node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 4abed37..aebae38 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -13,13 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from networkx import topological_sort, DiGraph +from networkx import topological_sort from aria.modeling import models -from aria.orchestrator import context -from aria.orchestrator.workflows import api, core +from aria.orchestrator import ( + context, + workflow_runner +) +from aria.orchestrator.workflows import api from aria.orchestrator.workflows.executor import base - from tests import mock from tests import storage @@ -66,12 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(simple_after_task, inner_task_graph) # Direct check - core.translation.store_tasks(ctx=task_context, - task_graph=test_task_graph, - execution=task_context.model.execution.list()[0], - default_executor=base.StubTaskExecutor) + execution = task_context.model.execution.list()[0] + + workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor) + task_context.execution = execution - execution_graph = core.translation.construct_graph(DiGraph(), task_context.execution) + execution_graph = workflow_runner.get_execution_graph(execution) execution_tasks = topological_sort(execution_graph) assert len(execution_tasks) == 7 @@ -87,27 +89,17 @@ def test_task_graph_into_execution_graph(tmpdir): ] assert expected_tasks_names == [t.api_id for t in execution_tasks] - assert all(isinstance(_get_task_by_name(task_name, execution_graph), models.Task) - for task_name in execution_tasks) - - first_task = _get_task_by_name(execution_tasks[0], execution_graph) - assert first_task.stub_type == models.Task.START_WORKFLOW - - second_task = _get_task_by_name(execution_tasks[1], execution_graph) - _assert_execution_is_api_task(second_task, simple_before_task) - - third_task = _get_task_by_name(execution_tasks[2], execution_graph) - assert third_task.stub_type == models.Task.START_SUBWROFKLOW + assert all(isinstance(task, models.Task) for task in execution_tasks) + execution_tasks = iter(execution_tasks) - fourth_task = _get_task_by_name(execution_tasks[3], execution_graph) - _assert_execution_is_api_task(fourth_task, inner_task) - fifth_task = _get_task_by_name(execution_tasks[4], execution_graph) - assert fifth_task.stub_type == models.Task.END_SUBWORKFLOW + assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_before_task) + assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW + _assert_execution_is_api_task(next(execution_tasks), inner_task) + assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW + _assert_execution_is_api_task(next(execution_tasks), simple_after_task) + assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW - sixth_task = _get_task_by_name(execution_tasks[5], execution_graph) - _assert_execution_is_api_task(sixth_task, simple_after_task) - seventh_task = _get_task_by_name(execution_tasks[6], execution_graph) - assert seventh_task.stub_type == models.Task.END_WORKFLOW storage.release_sqlite_storage(task_context.model) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/58e212c7/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 3fa75ad..410a982 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -36,7 +36,6 @@ from aria.orchestrator.workflows.executor import ( ) import tests -from . import MockContext def _get_function(func): @@ -45,18 +44,24 @@ def _get_function(func): def execute_and_assert(executor, ctx): node = ctx.model.node.list()[0] + execution = ctx.model.execution.list()[0] expected_value = 'value' - successful_ctx = ctx.model.task.model_cls(function=_get_function(mock_successful_task), - node=node, - _executor=executor) - failing_ctx = ctx.model.task.model_cls(function=_get_function(mock_failing_task), node=node) - ctx_with_inputs = ctx.model.task.model_cls( + successful_ctx = models.Task(function=_get_function(mock_successful_task), + node=node, _executor=executor, execution=execution) + failing_ctx = models.Task( + function=_get_function(mock_failing_task), node=node, _executor=executor, execution=execution) + ctx_with_inputs = models.Task( node=node, function=_get_function(mock_task_with_input), - arguments={'input': models.Argument.wrap('input', 'value')}) + arguments={'input': models.Argument.wrap('input', 'value')}, + _executor=executor, + execution=execution) - for task in [successful_ctx, failing_ctx, ctx_with_inputs]: - task.execute(ctx) + ctx.model.execution.update(execution) + + for op_ctx in [successful_ctx, failing_ctx, ctx_with_inputs]: + op_ctx.states = [] + op_ctx.execute(ctx) @retrying.retry(stop_max_delay=10000, wait_fixed=100) def assertion(): @@ -101,22 +106,14 @@ class MockException(Exception): @pytest.fixture def ctx(tmpdir): context = mock.context.simple(str(tmpdir)) + ctx.states = [] yield context storage.release_sqlite_storage(context.model) -@pytest.fixture(params=[ - (thread.ThreadExecutor, {'pool_size': 1}), - # (thread.ThreadExecutor, {'pool_size': 2}), - # subprocess needs to load a tests module so we explicitly add the root directory as if - # the project has been installed in editable mode - # (celery.CeleryExecutor, {'app': app}) -]) -def thread_executor(request): - executor_cls, executor_kwargs = request.param - result = executor_cls(**executor_kwargs) - yield result - result.close() +@pytest.fixture +def thread_executor(): + return thread.ThreadExecutor @pytest.fixture