Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-74-process-executor-hook 90a71b371 -> 0382b229c (forced update)
ARIA-74 Add process executor extension registration hooks Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0382b229 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0382b229 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0382b229 Branch: refs/heads/ARIA-74-process-executor-hook Commit: 0382b229c8eddcbf95ea7792b2bdcd9d9dda303b Parents: eaf9974 Author: Dan Kilman <d...@gigaspaces.com> Authored: Mon Jan 23 16:55:41 2017 +0200 Committer: Dan Kilman <d...@gigaspaces.com> Committed: Mon Jan 23 19:21:32 2017 +0200 ---------------------------------------------------------------------- aria/extension.py | 15 ++++ aria/orchestrator/plugin.py | 9 ++- aria/orchestrator/workflows/executor/process.py | 5 ++ .../executor/test_process_executor_extension.py | 80 ++++++++++++++++++++ 4 files changed, 106 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/aria/extension.py ---------------------------------------------------------------------- diff --git a/aria/extension.py b/aria/extension.py index ddb7c25..4dba74f 100644 --- a/aria/extension.py +++ b/aria/extension.py @@ -118,8 +118,23 @@ class _ParserExtensionRegistration(_ExtensionRegistration): parser = _ParserExtensionRegistration() +class _ProcessExecutorExtensionRegistration(_ExtensionRegistration): + """Process executor extension class decorator""" + + @_registrar + def decorate(self): + """ + The operation function executed by the process executor will be decorated with the function + returned from decorate(). + """ + return [] + +process_executor = _ProcessExecutorExtensionRegistration() + + def init(): """ Initialize all registrars by calling all registered functions """ parser.init() + process_executor.init() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/aria/orchestrator/plugin.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/plugin.py b/aria/orchestrator/plugin.py index 3005756..381504e 100644 --- a/aria/orchestrator/plugin.py +++ b/aria/orchestrator/plugin.py @@ -39,14 +39,17 @@ class PluginManager(object): """ metadata = wagon.show(source) cls = self._model.plugin.model_cls + + os_props = metadata['build_server_os_properties'] + plugin = cls( archive_name=metadata['archive_name'], supported_platform=metadata['supported_platform'], supported_py_versions=metadata['supported_python_versions'], # Remove suffix colon after upgrading wagon to > 0.5.0 - distribution=metadata['build_server_os_properties']['distribution:'], - distribution_release=metadata['build_server_os_properties']['distribution_version'], - distribution_version=metadata['build_server_os_properties']['distribution_release'], + distribution=os_props.get('distribution:') or os_props.get('distribution'), + distribution_release=os_props['distribution_version'], + distribution_version=os_props['distribution_release'], package_name=metadata['package_name'], package_version=metadata['package_version'], package_source=metadata['package_source'], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 2cc9178..9fa0302 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -39,6 +39,8 @@ import Queue import jsonpickle +import aria +from aria.extension import process_executor from aria.utils import imports from aria.utils import exceptions from aria.orchestrator.workflows.executor import base @@ -291,6 +293,9 @@ def _main(): try: ctx = serialization.operation_context_from_dict(context_dict) task_func = imports.load_attribute(operation_mapping) + aria.install_aria_extensions() + for decorate in process_executor.decorate(): + task_func = decorate(task_func) task_func(ctx=ctx, **operation_inputs) messenger.succeeded(tracked_changes=instrument.tracked_changes) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0382b229/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py new file mode 100644 index 0000000..4a8ef57 --- /dev/null +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria import extension +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation + +import tests +from tests import mock +from tests import storage + + +def test_decorate_extension(context, executor): + inputs = {'input1': 1, 'input2': 2} + + def get_node_instance(ctx): + return ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + + @workflow + def mock_workflow(ctx, graph): + node_instance = get_node_instance(ctx) + op = 'test.op' + op_dict = {'operation': '{0}.{1}'.format(__name__, _mock_operation.__name__)} + node_instance.node.operations['test.op'] = op_dict + task = api.task.OperationTask.node_instance(instance=node_instance, name=op, inputs=inputs) + graph.add_tasks(task) + return graph + graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) + eng.execute() + out = get_node_instance(context).runtime_properties['out'] + assert out['wrapper_inputs'] == inputs + assert out['function_inputs'] == inputs + + +@extension.process_executor +class MockProcessExecutorExtension(object): + + def decorate(self): + def decorator(function): + def wrapper(ctx, **operation_inputs): + ctx.node_instance.runtime_properties['out'] = {'wrapper_inputs': operation_inputs} + function(ctx=ctx, **operation_inputs) + return wrapper + return decorator + + +@operation +def _mock_operation(ctx, **operation_inputs): + ctx.node_instance.runtime_properties['out']['function_inputs'] = operation_inputs + + +@pytest.fixture +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + yield result + result.close() + + +@pytest.fixture +def context(tmpdir): + result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir))) + yield result + storage.release_sqlite_storage(result.model)