This is an automated email from the ASF dual-hosted git repository. kamilbregula pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 2b1dc1b Support properties in plugins (#9002) 2b1dc1b is described below commit 2b1dc1b8e163ca255da03cad1380d81e13b6b002 Author: Kamil BreguĊa <mik-...@users.noreply.github.com> AuthorDate: Sat May 30 21:12:21 2020 +0200 Support properties in plugins (#9002) --- airflow/plugins_manager.py | 18 ++++++++++-------- docs/plugins.rst | 5 +++-- tests/plugins/test_plugins_manager.py | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 10 deletions(-) diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 7e3b4ae..7c2b116 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -145,11 +145,12 @@ def load_entrypoint_plugins(): for entry_point in entry_points: # pylint: disable=too-many-nested-blocks log.debug('Importing entry_point plugin %s', entry_point.name) try: - plugin_obj = entry_point.load() - if is_valid_plugin(plugin_obj): - if callable(getattr(plugin_obj, 'on_load', None)): - plugin_obj.on_load() - plugins.append(plugin_obj) + plugin_class = entry_point.load() + if is_valid_plugin(plugin_class): + plugin_instance = plugin_class() + if callable(getattr(plugin_instance, 'on_load', None)): + plugin_instance.on_load() + plugins.append(plugin_instance) except Exception as e: # pylint: disable=broad-except log.exception("Failed to import plugin %s", entry_point.name) import_errors[entry_point.module_name] = str(e) @@ -182,9 +183,10 @@ def load_plugins_from_plugin_directory(): mod = importlib.util.module_from_spec(spec) sys.modules[spec.name] = mod loader.exec_module(mod) - for obj in list(mod.__dict__.values()): - if is_valid_plugin(obj): - plugins.append(obj) + for mod_attr_value in list(mod.__dict__.values()): + if is_valid_plugin(mod_attr_value): + plugin_instance = mod_attr_value() + plugins.append(plugin_instance) except Exception as e: # pylint: disable=broad-except log.exception(e) path = filepath or str(f) diff --git a/docs/plugins.rst b/docs/plugins.rst index fc13e9a..e56f36c 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -115,8 +115,9 @@ looks like: # buttons. operator_extra_links = [] -You can derive it by inheritance (please refer to the example below). -Please note ``name`` inside this class must be specified. +You can derive it by inheritance (please refer to the example below). In the example, all options have been +defined as class attributes, but you can also define them as properties if you need to perform +additional initialization. Please note ``name`` inside this class must be specified. After the plugin is imported into Airflow, you can invoke it using statement like diff --git a/tests/plugins/test_plugins_manager.py b/tests/plugins/test_plugins_manager.py index 47495ac..78da6ba 100644 --- a/tests/plugins/test_plugins_manager.py +++ b/tests/plugins/test_plugins_manager.py @@ -19,6 +19,8 @@ import unittest from unittest import mock +from airflow.hooks.base_hook import BaseHook +from airflow.plugins_manager import AirflowPlugin from airflow.www import app as application @@ -93,3 +95,34 @@ class TestPluginsRBAC(unittest.TestCase): assert "Version Conflict" in received_logs assert "Failed to import plugin test-entrypoint" in received_logs assert "Version Conflict", "test.plugins.test_plugins_manager" in import_errors.items() + + +class TestPluginsManager(unittest.TestCase): + class AirflowTestPropertyPlugin(AirflowPlugin): + name = "test_property_plugin" + + @property + def operators(self): + from airflow.models.baseoperator import BaseOperator + + class PluginPropertyOperator(BaseOperator): + pass + + return [PluginPropertyOperator] + + class TestNonPropertyHook(BaseHook): + pass + + hooks = [TestNonPropertyHook] + + @mock.patch('airflow.plugins_manager.plugins', [AirflowTestPropertyPlugin()]) + @mock.patch('airflow.plugins_manager.operators_modules', None) + @mock.patch('airflow.plugins_manager.sensors_modules', None) + @mock.patch('airflow.plugins_manager.hooks_modules', None) + @mock.patch('airflow.plugins_manager.macros_modules', None) + def test_should_load_plugins_from_property(self): + from airflow import plugins_manager + plugins_manager.integrate_dag_plugins() + self.assertIn('TestPluginsManager.AirflowTestPropertyPlugin', str(plugins_manager.plugins)) + self.assertIn('PluginPropertyOperator', str(plugins_manager.operators_modules[0].__dict__)) + self.assertIn("TestNonPropertyHook", str(plugins_manager.hooks_modules[0].__dict__))