Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-74-process-executor-hook 90a71b371 -> 0382b229c (forced 
update)


ARIA-74 Add process executor extension registration hooks


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0382b229
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0382b229
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0382b229

Branch: refs/heads/ARIA-74-process-executor-hook
Commit: 0382b229c8eddcbf95ea7792b2bdcd9d9dda303b
Parents: eaf9974
Author: Dan Kilman <d...@gigaspaces.com>
Authored: Mon Jan 23 16:55:41 2017 +0200
Committer: Dan Kilman <d...@gigaspaces.com>
Committed: Mon Jan 23 19:21:32 2017 +0200

----------------------------------------------------------------------
 aria/extension.py                               | 15 ++++
 aria/orchestrator/plugin.py                     |  9 ++-
 aria/orchestrator/workflows/executor/process.py |  5 ++
 .../executor/test_process_executor_extension.py | 80 ++++++++++++++++++++
 4 files changed, 106 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/aria/extension.py
----------------------------------------------------------------------
diff --git a/aria/extension.py b/aria/extension.py
index ddb7c25..4dba74f 100644
--- a/aria/extension.py
+++ b/aria/extension.py
@@ -118,8 +118,23 @@ class _ParserExtensionRegistration(_ExtensionRegistration):
 parser = _ParserExtensionRegistration()
 
 
+class _ProcessExecutorExtensionRegistration(_ExtensionRegistration):
+    """Process executor extension class decorator"""
+
+    @_registrar
+    def decorate(self):
+        """
+        The operation function executed by the process executor will be 
decorated with the function
+        returned from decorate().
+        """
+        return []
+
+process_executor = _ProcessExecutorExtensionRegistration()
+
+
 def init():
     """
     Initialize all registrars by calling all registered functions
     """
     parser.init()
+    process_executor.init()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/aria/orchestrator/plugin.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/plugin.py b/aria/orchestrator/plugin.py
index 3005756..381504e 100644
--- a/aria/orchestrator/plugin.py
+++ b/aria/orchestrator/plugin.py
@@ -39,14 +39,17 @@ class PluginManager(object):
         """
         metadata = wagon.show(source)
         cls = self._model.plugin.model_cls
+
+        os_props = metadata['build_server_os_properties']
+
         plugin = cls(
             archive_name=metadata['archive_name'],
             supported_platform=metadata['supported_platform'],
             supported_py_versions=metadata['supported_python_versions'],
             # Remove suffix colon after upgrading wagon to > 0.5.0
-            
distribution=metadata['build_server_os_properties']['distribution:'],
-            
distribution_release=metadata['build_server_os_properties']['distribution_version'],
-            
distribution_version=metadata['build_server_os_properties']['distribution_release'],
+            distribution=os_props.get('distribution:') or 
os_props.get('distribution'),
+            distribution_release=os_props['distribution_version'],
+            distribution_version=os_props['distribution_release'],
             package_name=metadata['package_name'],
             package_version=metadata['package_version'],
             package_source=metadata['package_source'],

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 2cc9178..9fa0302 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -39,6 +39,8 @@ import Queue
 
 import jsonpickle
 
+import aria
+from aria.extension import process_executor
 from aria.utils import imports
 from aria.utils import exceptions
 from aria.orchestrator.workflows.executor import base
@@ -291,6 +293,9 @@ def _main():
         try:
             ctx = serialization.operation_context_from_dict(context_dict)
             task_func = imports.load_attribute(operation_mapping)
+            aria.install_aria_extensions()
+            for decorate in process_executor.decorate():
+                task_func = decorate(task_func)
             task_func(ctx=ctx, **operation_inputs)
             messenger.succeeded(tracked_changes=instrument.tracked_changes)
         except BaseException as e:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_extension.py 
b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
new file mode 100644
index 0000000..4a8ef57
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria import extension
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests import mock
+from tests import storage
+
+
+def test_decorate_extension(context, executor):
+    inputs = {'input1': 1, 'input2': 2}
+
+    def get_node_instance(ctx):
+        return 
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+
+    @workflow
+    def mock_workflow(ctx, graph):
+        node_instance = get_node_instance(ctx)
+        op = 'test.op'
+        op_dict = {'operation': '{0}.{1}'.format(__name__, 
_mock_operation.__name__)}
+        node_instance.node.operations['test.op'] = op_dict
+        task = api.task.OperationTask.node_instance(instance=node_instance, 
name=op, inputs=inputs)
+        graph.add_tasks(task)
+        return graph
+    graph = mock_workflow(ctx=context)  # pylint: 
disable=no-value-for-parameter
+    eng = engine.Engine(executor=executor, workflow_context=context, 
tasks_graph=graph)
+    eng.execute()
+    out = get_node_instance(context).runtime_properties['out']
+    assert out['wrapper_inputs'] == inputs
+    assert out['function_inputs'] == inputs
+
+
+@extension.process_executor
+class MockProcessExecutorExtension(object):
+
+    def decorate(self):
+        def decorator(function):
+            def wrapper(ctx, **operation_inputs):
+                ctx.node_instance.runtime_properties['out'] = 
{'wrapper_inputs': operation_inputs}
+                function(ctx=ctx, **operation_inputs)
+            return wrapper
+        return decorator
+
+
+@operation
+def _mock_operation(ctx, **operation_inputs):
+    ctx.node_instance.runtime_properties['out']['function_inputs'] = 
operation_inputs
+
+
+@pytest.fixture
+def executor():
+    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+    yield result
+    result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+    result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)))
+    yield result
+    storage.release_sqlite_storage(result.model)

Reply via email to