Repository: incubator-airflow Updated Branches: refs/heads/master 072fa8ee5 -> 54f1c11b6
[AIRFLOW-162] Allow variable to be accessible into templates Closes #1540 from alexvanboxel/AIRFLOW-162 AIRFLOW-162 Allow variable to be accessible into templates Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/54f1c11b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/54f1c11b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/54f1c11b Branch: refs/heads/master Commit: 54f1c11b6f46b16bc41c289bab1738fec8ab9608 Parents: 072fa8e Author: Alex Van Boxel <a...@vanboxel.be> Authored: Tue Jun 21 10:29:19 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Tue Jun 21 10:29:19 2016 -0700 ---------------------------------------------------------------------- airflow/models.py | 22 +++++++++++ docs/code.rst | 9 +++++ tests/core.py | 101 ++++++++++++++++++++++++++++++++++++++++++------- 3 files changed, 118 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54f1c11b/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 09d880e..cccbad7 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1446,6 +1446,24 @@ class TaskInstance(Base): if task.params: params.update(task.params) + class VariableAccessor: + """ + Wrapper around Variable. This way you can get variables in templates by using + {var.variable_name}. + """ + def __init__(self): + pass + + def __getattr__(self, item): + return Variable.get(item) + + class VariableJsonAccessor: + def __init__(self): + pass + + def __getattr__(self, item): + return Variable.get(item, deserialize_json=True) + return { 'dag': task.dag, 'ds': ds, @@ -1471,6 +1489,10 @@ class TaskInstance(Base): 'task_instance_key_str': ti_key_str, 'conf': configuration, 'test_mode': self.test_mode, + 'var': { + 'value': VariableAccessor(), + 'json': VariableJsonAccessor() + } } def render_templates(self): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54f1c11b/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 8a48b81..8ffb6e4 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -133,6 +133,10 @@ Variable Description ``{{ latest_date }}`` same as ``{{ ds }}`` ``{{ ti }}`` same as ``{{ task_instance }}`` ``{{ params }}`` a reference to the user-defined params dictionary +``{{ var.value.my_var }}`` global defined variables represented as a dictionary +``{{ var.json.my_var.path }}`` global defined variables represented as a dictionary + with deserialized JSON object, append the path to the + key within the JSON object ``{{ task_instance_key_str }}`` a unique, human-readable key to the task instance formatted ``{dag_id}_{task_id}_{ds}`` ``conf`` the full configuration object located at @@ -151,6 +155,11 @@ dot notation. Here are some examples of what is possible: Refer to the models documentation for more information on the objects' attributes and methods. +The ``var`` template variable allows you to access variables defined in Airflow's +UI. You can access them as either plain-text or JSON. If you use JSON, you are +also able to walk nested structures, such as dictionaries like: +``{{ var.json.my_dict_var.key1 }}`` + Macros '''''' Macros are a way to expose objects to your templates and live under the http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54f1c11b/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 76b20f9..9c688ea 100644 --- a/tests/core.py +++ b/tests/core.py @@ -85,6 +85,20 @@ def reset(dag_id=TEST_DAG_ID): reset() +class OperatorSubclass(operators.BaseOperator): + """ + An operator to test template substitution + """ + template_fields = ['some_templated_field'] + + def __init__(self, some_templated_field, *args, **kwargs): + super(OperatorSubclass, self).__init__(*args, **kwargs) + self.some_templated_field = some_templated_field + + def execute(*args, **kwargs): + pass + + class CoreTest(unittest.TestCase): def setUp(self): configuration.test_mode() @@ -439,29 +453,88 @@ class CoreTest(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_complex_template(self): - class OperatorSubclass(operators.BaseOperator): - template_fields = ['some_templated_field'] - - def __init__(self, some_templated_field, *args, **kwargs): - super(OperatorSubclass, self).__init__(*args, **kwargs) - self.some_templated_field = some_templated_field - - def execute(*args, **kwargs): - pass - - def test_some_templated_field_template_render(context): + def verify_templated_field(context): self.assertEqual(context['ti'].task.some_templated_field['bar'][1], context['ds']) t = OperatorSubclass( task_id='test_complex_template', - provide_context=True, some_templated_field={ 'foo': '123', - 'bar': ['baz', '{{ ds }}'] + 'bar': ['baz', ' {{ ds }}'] }, - on_success_callback=test_some_templated_field_template_render, + on_success_callback=verify_templated_field, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_template_with_variable(self): + """ + Test the availability of variables in templates + """ + val = { + 'success':False, + 'test_value': 'a test value' + } + Variable.set("a_variable", val['test_value']) + + def verify_templated_field(context): + self.assertEqual(context['ti'].task.some_templated_field, + val['test_value']) + val['success'] = True + + t = OperatorSubclass( + task_id='test_complex_template', + some_templated_field='{{ var.value.a_variable }}', + on_success_callback=verify_templated_field, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + assert val['success'] + + def test_template_with_json_variable(self): + """ + Test the availability of variables (serialized as JSON) in templates + """ + val = { + 'success': False, + 'test_value': {'foo': 'bar', 'obj': {'v1': 'yes', 'v2': 'no'}} + } + Variable.set("a_variable", val['test_value'], serialize_json=True) + + def verify_templated_field(context): + self.assertEqual(context['ti'].task.some_templated_field, + val['test_value']['obj']['v2']) + val['success'] = True + + t = OperatorSubclass( + task_id='test_complex_template', + some_templated_field='{{ var.json.a_variable.obj.v2 }}', + on_success_callback=verify_templated_field, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + assert val['success'] + + def test_template_with_json_variable_as_value(self): + """ + Test the availability of variables (serialized as JSON) in templates, but + accessed as a value + """ + val = { + 'success': False, + 'test_value': {'foo': 'bar'} + } + Variable.set("a_variable", val['test_value'], serialize_json=True) + + def verify_templated_field(context): + self.assertEqual(context['ti'].task.some_templated_field, + u'{"foo": "bar"}') + val['success'] = True + + t = OperatorSubclass( + task_id='test_complex_template', + some_templated_field='{{ var.value.a_variable }}', + on_success_callback=verify_templated_field, dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + assert val['success'] def test_import_examples(self): self.assertEqual(len(self.dagbag.dags), NUM_EXAMPLE_DAGS)