Repository: incubator-airflow
Updated Branches:
  refs/heads/master 072fa8ee5 -> 54f1c11b6


[AIRFLOW-162] Allow variable to be accessible into templates

Closes #1540 from alexvanboxel/AIRFLOW-162

AIRFLOW-162 Allow variable to be accessible into templates


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/54f1c11b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/54f1c11b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/54f1c11b

Branch: refs/heads/master
Commit: 54f1c11b6f46b16bc41c289bab1738fec8ab9608
Parents: 072fa8e
Author: Alex Van Boxel <a...@vanboxel.be>
Authored: Tue Jun 21 10:29:19 2016 -0700
Committer: Chris Riccomini <chr...@wepay.com>
Committed: Tue Jun 21 10:29:19 2016 -0700

----------------------------------------------------------------------
 airflow/models.py |  22 +++++++++++
 docs/code.rst     |   9 +++++
 tests/core.py     | 101 ++++++++++++++++++++++++++++++++++++++++++-------
 3 files changed, 118 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54f1c11b/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 09d880e..cccbad7 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1446,6 +1446,24 @@ class TaskInstance(Base):
         if task.params:
             params.update(task.params)
 
+        class VariableAccessor:
+            """
+            Wrapper around Variable. This way you can get variables in 
templates by using
+            {var.variable_name}.
+            """
+            def __init__(self):
+                pass
+
+            def __getattr__(self, item):
+                return Variable.get(item)
+
+        class VariableJsonAccessor:
+            def __init__(self):
+                pass
+
+            def __getattr__(self, item):
+                return Variable.get(item, deserialize_json=True)
+
         return {
             'dag': task.dag,
             'ds': ds,
@@ -1471,6 +1489,10 @@ class TaskInstance(Base):
             'task_instance_key_str': ti_key_str,
             'conf': configuration,
             'test_mode': self.test_mode,
+            'var': {
+                'value': VariableAccessor(),
+                'json': VariableJsonAccessor()
+            }
         }
 
     def render_templates(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54f1c11b/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 8a48b81..8ffb6e4 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -133,6 +133,10 @@ Variable                            Description
 ``{{ latest_date }}``               same as ``{{ ds }}``
 ``{{ ti }}``                        same as ``{{ task_instance }}``
 ``{{ params }}``                    a reference to the user-defined params 
dictionary
+``{{ var.value.my_var }}``          global defined variables represented as a 
dictionary
+``{{ var.json.my_var.path }}``      global defined variables represented as a 
dictionary
+                                    with deserialized JSON object, append the 
path to the
+                                    key within the JSON object
 ``{{ task_instance_key_str }}``     a unique, human-readable key to the task 
instance
                                     formatted ``{dag_id}_{task_id}_{ds}``
 ``conf``                            the full configuration object located at
@@ -151,6 +155,11 @@ dot notation. Here are some examples of what is possible:
 Refer to the models documentation for more information on the objects'
 attributes and methods.
 
+The ``var`` template variable allows you to access variables defined in 
Airflow's
+UI. You can access them as either plain-text or JSON. If you use JSON, you are
+also able to walk nested structures, such as dictionaries like:
+``{{ var.json.my_dict_var.key1 }}``
+
 Macros
 ''''''
 Macros are a way to expose objects to your templates and live under the

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/54f1c11b/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 76b20f9..9c688ea 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -85,6 +85,20 @@ def reset(dag_id=TEST_DAG_ID):
 reset()
 
 
+class OperatorSubclass(operators.BaseOperator):
+    """
+    An operator to test template substitution
+    """
+    template_fields = ['some_templated_field']
+
+    def __init__(self, some_templated_field, *args, **kwargs):
+        super(OperatorSubclass, self).__init__(*args, **kwargs)
+        self.some_templated_field = some_templated_field
+
+    def execute(*args, **kwargs):
+        pass
+
+
 class CoreTest(unittest.TestCase):
     def setUp(self):
         configuration.test_mode()
@@ -439,29 +453,88 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_complex_template(self):
-        class OperatorSubclass(operators.BaseOperator):
-            template_fields = ['some_templated_field']
-
-            def __init__(self, some_templated_field, *args, **kwargs):
-                super(OperatorSubclass, self).__init__(*args, **kwargs)
-                self.some_templated_field = some_templated_field
-
-            def execute(*args, **kwargs):
-                pass
-
-        def test_some_templated_field_template_render(context):
+        def verify_templated_field(context):
             
self.assertEqual(context['ti'].task.some_templated_field['bar'][1], 
context['ds'])
 
         t = OperatorSubclass(
             task_id='test_complex_template',
-            provide_context=True,
             some_templated_field={
                 'foo': '123',
-                'bar': ['baz', '{{ ds }}']
+                'bar': ['baz', ' {{ ds }}']
             },
-            on_success_callback=test_some_templated_field_template_render,
+            on_success_callback=verify_templated_field,
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+    def test_template_with_variable(self):
+        """
+        Test the availability of variables in templates
+        """
+        val = {
+            'success':False,
+            'test_value': 'a test value'
+        }
+        Variable.set("a_variable", val['test_value'])
+
+        def verify_templated_field(context):
+            self.assertEqual(context['ti'].task.some_templated_field,
+                             val['test_value'])
+            val['success'] = True
+
+        t = OperatorSubclass(
+            task_id='test_complex_template',
+            some_templated_field='{{ var.value.a_variable }}',
+            on_success_callback=verify_templated_field,
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        assert val['success']
+
+    def test_template_with_json_variable(self):
+        """
+        Test the availability of variables (serialized as JSON) in templates
+        """
+        val = {
+            'success': False,
+            'test_value': {'foo': 'bar', 'obj': {'v1': 'yes', 'v2': 'no'}}
+        }
+        Variable.set("a_variable", val['test_value'], serialize_json=True)
+
+        def verify_templated_field(context):
+            self.assertEqual(context['ti'].task.some_templated_field,
+                             val['test_value']['obj']['v2'])
+            val['success'] = True
+
+        t = OperatorSubclass(
+            task_id='test_complex_template',
+            some_templated_field='{{ var.json.a_variable.obj.v2 }}',
+            on_success_callback=verify_templated_field,
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        assert val['success']
+
+    def test_template_with_json_variable_as_value(self):
+        """
+        Test the availability of variables (serialized as JSON) in templates, 
but
+        accessed as a value
+        """
+        val = {
+            'success': False,
+            'test_value': {'foo': 'bar'}
+        }
+        Variable.set("a_variable", val['test_value'], serialize_json=True)
+
+        def verify_templated_field(context):
+            self.assertEqual(context['ti'].task.some_templated_field,
+                             u'{"foo": "bar"}')
+            val['success'] = True
+
+        t = OperatorSubclass(
+            task_id='test_complex_template',
+            some_templated_field='{{ var.value.a_variable }}',
+            on_success_callback=verify_templated_field,
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+        assert val['success']
 
     def test_import_examples(self):
         self.assertEqual(len(self.dagbag.dags), NUM_EXAMPLE_DAGS)

Reply via email to