Repository: incubator-ariatosca
Updated Branches:
  refs/heads/wf-executor a0ac985a1 -> 5f27cb51f (forced update)


Add translation mechanism from task_graph into execution_graph


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f2f41313
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f2f41313
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f2f41313

Branch: refs/heads/wf-executor
Commit: f2f4131369b7c98af889195e4f5ba7eeae5f63a9
Parents: b53aa1f
Author: mxmrlv <mxm...@gmail.com>
Authored: Thu Oct 13 19:52:35 2016 +0300
Committer: mxmrlv <mxm...@gmail.com>
Committed: Tue Oct 18 16:45:37 2016 +0300

----------------------------------------------------------------------
 aria/workflows/engine/engine.py                 | 35 ++-------
 aria/workflows/engine/tasks.py                  | 61 ++++++++++++++
 aria/workflows/engine/translation.py            | 83 ++++++++++++++++++++
 tests/requirements.txt                          |  1 +
 tests/workflows/__init__.py                     | 14 ++++
 .../test_task_graph_into_exececution_graph.py   | 73 +++++++++++++++++
 6 files changed, 240 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/aria/workflows/engine/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py
index 7b86fb5..508ae3b 100644
--- a/aria/workflows/engine/engine.py
+++ b/aria/workflows/engine/engine.py
@@ -29,6 +29,11 @@ from aria.events import (
 )
 from aria.logger import LoggerMixin
 
+from .translation import build_execution_graph
+
+
+from ...storage import Model
+
 
 class Engine(LoggerMixin):
 
@@ -38,10 +43,9 @@ class Engine(LoggerMixin):
         self._tasks_graph = tasks_graph
         self._execution_graph = DiGraph()
         self._executor = executor
-        self._build_execution_graph(self._workflow_context, self._tasks_graph)
-
-    def _build_execution_graph(self, workflow_context, graph):
-        pass
+        build_execution_graph(task_graph=self._tasks_graph,
+                              workflow_context=workflow_context,
+                              execution_graph=self._execution_graph)
 
     def execute(self):
         execution_id = self._workflow_context.execution_id
@@ -160,26 +164,3 @@ class Engine(LoggerMixin):
             start_task_signal.disconnect(self._task_started_receiver)
             on_success_task_signal.disconnect(self._task_succeeded_receiver)
             on_failure_task_signal.disconnect(self._task_failed_receiver)
-
-
-class Task(object):
-
-    def __init__(self, operation_context):
-        self.operation_context = operation_context
-        self._create_operation_in_storage()
-
-    def _create_operation_in_storage(self):
-        Operation = self.operation_context.storage.operation.model_cls
-        operation = Operation(
-            id=self.operation_context.id,
-            execution_id=self.operation_context.execution_id,
-            max_retries=self.operation_context.parameters.get('max_retries', 
1),
-            status=Operation.PENDING,
-        )
-        self.operation_context.operation = operation
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.operation_context, attr)
-        except AttributeError:
-            return super(Task, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/aria/workflows/engine/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py
new file mode 100644
index 0000000..83b4263
--- /dev/null
+++ b/aria/workflows/engine/tasks.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class BaseTask(object):
+
+    def __init__(self, id, name, context):
+        self.id = id
+        self.name = name
+        self.context = context
+
+
+class StartWorkflowTask(BaseTask):
+    pass
+
+
+class EndWorkflowTask(BaseTask):
+    pass
+
+
+class StartSubWorkflowTask(BaseTask):
+    pass
+
+
+class EndSubWorkflowTask(BaseTask):
+    pass
+
+
+class OperationTask(BaseTask):
+    def __init__(self, *args, **kwargs):
+        super(OperationTask, self).__init__(*args, **kwargs)
+        self._create_operation_in_storage()
+
+    def _create_operation_in_storage(self):
+        Operation = self.context.storage.operation.model_cls
+        operation = Operation(
+            id=self.context.id,
+            execution_id=self.context.execution_id,
+            max_retries=self.context.parameters.get('max_retries', 1),
+            status=Operation.PENDING,
+        )
+        self.context.operation = operation
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.context, attr)
+        except AttributeError:
+            return super(OperationTask, self).__getattribute__(attr)
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/aria/workflows/engine/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/translation.py 
b/aria/workflows/engine/translation.py
new file mode 100644
index 0000000..71d7bcd
--- /dev/null
+++ b/aria/workflows/engine/translation.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria import contexts
+
+from . import tasks
+
+
+def build_execution_graph(
+        task_graph,
+        workflow_context,
+        execution_graph,
+        start_cls=tasks.StartWorkflowTask,
+        end_cls=tasks.EndWorkflowTask,
+        depends_on=()):
+    # Insert start marker
+    start_task = start_cls(id=_start_graph_suffix(task_graph.id),
+                           name=_start_graph_suffix(task_graph.name),
+                           context=workflow_context)
+    _add_task_and_dependencies(execution_graph, start_task, depends_on)
+
+    for operation_or_workflow, dependencies in 
task_graph.task_tree(reverse=True):
+        operation_dependencies = _get_tasks_from_dependencies(execution_graph, 
dependencies, default=[start_task])
+
+        if _is_operation(operation_or_workflow):
+            # Add the task an the dependencies
+            operation_task = tasks.OperationTask(id=operation_or_workflow.id,
+                                                 
name=operation_or_workflow.name,
+                                                 context=operation_or_workflow)
+            _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
+        else:
+            # Built the graph recursively while adding start and end markers
+            build_execution_graph(
+                task_graph=operation_or_workflow,
+                workflow_context=workflow_context,
+                execution_graph=execution_graph,
+                start_cls=tasks.StartSubWorkflowTask,
+                end_cls=tasks.EndSubWorkflowTask,
+                depends_on=operation_dependencies
+            )
+
+    # Insert end marker
+    workflow_dependencies = _get_tasks_from_dependencies(execution_graph, 
task_graph.leaf_tasks, default=[start_task])
+    end_task = end_cls(id=_end_graph_suffix(task_graph.id), 
name=_end_graph_suffix(task_graph.name), context=workflow_context)
+    _add_task_and_dependencies(execution_graph, end_task, 
workflow_dependencies)
+
+
+def _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies=()):
+    execution_graph.add_node(operation_task.id, task=operation_task)
+    for dependency in operation_dependencies:
+        execution_graph.add_edge(dependency.id, operation_task.id)
+
+
+def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
+    """
+    Returns task list from dependencies.
+    """
+    return [execution_graph.node[dependency.id if _is_operation(dependency) 
else _end_graph_suffix(dependency.id)]
+            ['task'] for dependency in dependencies] or default
+
+
+def _is_operation(task):
+    return isinstance(task, contexts.OperationContext)
+
+
+def _start_graph_suffix(id):
+    return '{0}-Start'.format(id)
+
+
+def _end_graph_suffix(id):
+    return '{0}-End'.format(id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 07d82a6..f3443d1 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -4,3 +4,4 @@ tox==1.6.1
 pylint==1.5.5
 pytest==3.0.2
 pytest-cov==2.3.1
+pytest-mock==1.2

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/tests/workflows/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/__init__.py b/tests/workflows/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/workflows/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f2f41313/tests/workflows/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py 
b/tests/workflows/test_task_graph_into_exececution_graph.py
new file mode 100644
index 0000000..28f31a0
--- /dev/null
+++ b/tests/workflows/test_task_graph_into_exececution_graph.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+from networkx import topological_sort, DiGraph
+
+from aria import contexts
+from aria.workflows.api import tasks_graph
+from aria.workflows.engine import tasks, translation
+
+
+@pytest.fixture(autouse=True)
+def no_storage(monkeypatch):
+    monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage', 
value=lambda *args, **kwargs: None)
+
+
+def test_task_graph_into_execution_graph():
+    task_graph = tasks_graph.TaskGraph('test_task_graph')
+    simple_before_task = contexts.OperationContext('test_simple_before_task', 
{}, {}, None)
+    simple_after_task = contexts.OperationContext('test_simple_after_task', 
{}, {}, None)
+
+    inner_task_graph = tasks_graph.TaskGraph('test_inner_task_graph')
+    inner_task = contexts.OperationContext('test_inner_task', {}, {}, None)
+    inner_task_graph.add_task(inner_task)
+
+    task_graph.add_task(simple_before_task)
+    task_graph.add_task(simple_after_task)
+    task_graph.add_task(inner_task_graph)
+    task_graph.dependency(inner_task_graph, [simple_before_task])
+    task_graph.dependency(simple_after_task, [inner_task_graph])
+
+    # Direct check
+    execution_graph = DiGraph()
+    translation.build_execution_graph(task_graph=task_graph, 
workflow_context=None, execution_graph=execution_graph)
+    execution_tasks = topological_sort(execution_graph)
+
+    assert len(execution_tasks) == 7
+
+    expected_tasks_names = [
+        '{0}-Start'.format(task_graph.id),
+        simple_before_task.id,
+        '{0}-Start'.format(inner_task_graph.id),
+        inner_task.id,
+        '{0}-End'.format(inner_task_graph.id),
+        simple_after_task.id,
+        '{0}-End'.format(task_graph.id)
+    ]
+
+    assert expected_tasks_names == execution_tasks
+
+    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), 
tasks.StartWorkflowTask)
+    assert simple_before_task == _get_task_by_name(execution_tasks[1], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), 
tasks.StartSubWorkflowTask)
+    assert inner_task == _get_task_by_name(execution_tasks[3], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), 
tasks.EndSubWorkflowTask)
+    assert simple_after_task == _get_task_by_name(execution_tasks[5], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), 
tasks.EndWorkflowTask)
+
+
+def _get_task_by_name(task_name, graph):
+    return graph.node[task_name]['task']

Reply via email to