Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution c211e1064 -> 5cb4f86a6
removed the usage of execution graph from the code (currently still remain in test Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5cb4f86a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5cb4f86a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5cb4f86a Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution Commit: 5cb4f86a6385a1085bc791bc6ce8f5dae4f36bb8 Parents: c211e10 Author: max-orlov <ma...@gigaspaces.com> Authored: Wed Jun 21 14:56:42 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed Jun 21 14:56:42 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 106 +++++++++++++++--------- 1 file changed, 66 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cb4f86a/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 9f0ddd7..8999232 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -45,18 +45,20 @@ class Engine(logger.LoggerMixin): """ execute the workflow """ - executing_tasks = [] + task_tracker = _TasksTracker(ctx) try: events.start_workflow_signal.send(ctx) while True: cancel = self._is_cancel(ctx) if cancel: break - for task in self._ended_tasks(ctx, executing_tasks): - self._handle_ended_tasks(ctx, task, executing_tasks) - for task in self._executable_tasks(ctx): - self._handle_executable_task(ctx, task, executing_tasks) - if self._all_tasks_consumed(ctx): + for task in task_tracker.ended_tasks: + task_tracker.finished_(task) + self._handle_ended_tasks(task) + for task in task_tracker.executable_tasks: + task_tracker.executing_(task) + self._handle_executable_task(ctx, task) + if task_tracker.all_tasks_consumed: break else: time.sleep(0.1) @@ -82,34 +84,7 @@ class Engine(logger.LoggerMixin): execution = ctx.model.execution.refresh(ctx.execution) return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) - def _executable_tasks(self, ctx): - now = datetime.utcnow() - return ( - task for task in self._tasks_iter(ctx) - if task.is_waiting() and task.due_at <= now and \ - not self._task_has_dependencies(ctx, task) - ) - - @staticmethod - def _ended_tasks(ctx, executing_tasks): - for task in executing_tasks: - if task.has_ended() and task in ctx._graph: - yield task - - @staticmethod - def _task_has_dependencies(ctx, task): - return len(ctx._graph.pred.get(task, [])) > 0 - - @staticmethod - def _all_tasks_consumed(ctx): - return len(ctx._graph.node) == 0 - - @staticmethod - def _tasks_iter(ctx): - for task in ctx.execution.tasks: - yield ctx.model.task.refresh(task) - - def _handle_executable_task(self, ctx, task, executing_tasks): + def _handle_executable_task(self, ctx, task): task_executor = self._executors[task._executor] # If the task is a stub, a default context is provided, else it should hold the context cls @@ -125,16 +100,67 @@ class Engine(logger.LoggerMixin): name=task.name ) - executing_tasks.append(task) - if not task._stub_type: events.sent_task_signal.send(op_ctx) task_executor.execute(op_ctx) @staticmethod - def _handle_ended_tasks(ctx, task, executing_tasks): - executing_tasks.remove(task) + def _handle_ended_tasks(task): if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') - else: - ctx._graph.remove_node(task) + + +class _TasksTracker(): + def __init__(self, ctx): + self._ctx = ctx + self._tasks = ctx.execution.tasks + self._executable_tasks = list(self._tasks) + self._executing_tasks = [] + self._executed_tasks = [] + + @property + def all_tasks_consumed(self): + return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 + + def executing_(self, task): + self._executable_tasks.remove(task) + self._executing_tasks.append(task) + + def finished_(self, task): + self._executing_tasks.remove(task) + self._executed_tasks.append(task) + + @property + def ended_tasks(self): + for task in self.executing_tasks: + if task.has_ended(): + yield task + + @property + def executable_tasks(self): + now = datetime.utcnow() + for task in self._update_tasks(self._executable_tasks): + if all([task.is_waiting(), + task.due_at <= now, + all(dependency in self._executed_tasks for dependency in task.dependencies) + ]): + yield task + + @property + def executing_tasks(self): + for task in self._update_tasks(self._executing_tasks): + yield task + + @property + def executed_tasks(self): + for task in self._update_tasks(self._executed_tasks): + yield task + + @property + def tasks(self): + for task in self._update_tasks(self._tasks): + yield task + + def _update_tasks(self, tasks): + for task in tasks: + yield self._ctx.model.task.refresh(task) \ No newline at end of file