stale[bot] closed pull request #3453: [AIRFLOW-2554] Enable convenience access to in/outlets in templates URL: https://github.com/apache/incubator-airflow/pull/3453
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/models.py b/airflow/models.py index eda480832b..0f9e1eb131 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1808,6 +1808,10 @@ def get_template_context(self, session=None): session.expunge_all() session.commit() + # create convenience names for inlets and outlets + inlets = dict((i.qualified_name, i) for i in task.inlets) + outlets = dict((o.qualified_name, o) for o in task.outlets) + if task.params: params.update(task.params) @@ -1844,7 +1848,9 @@ def __getattr__(self, item): def __repr__(self): return str(self.var) - return { + # make sure dag level overwrite inlets/outlets + context = dict(inlets, **outlets) + context.update({ 'dag': task.dag, 'ds': ds, 'next_ds': next_ds, @@ -1877,9 +1883,11 @@ def __repr__(self): 'value': VariableAccessor(), 'json': VariableJsonAccessor() }, - 'inlets': task.inlets, - 'outlets': task.outlets, - } + 'inlets': inlets, + 'outlets': outlets, + }) + + return context def overwrite_params_with_dag_run_conf(self, params, dag_run): if dag_run and dag_run.conf: diff --git a/docs/lineage.rst b/docs/lineage.rst index 719ef0115e..3a9c87e4ff 100644 --- a/docs/lineage.rst +++ b/docs/lineage.rst @@ -16,30 +16,30 @@ works. from airflow.lineage.datasets import File from airflow.models import DAG from datetime import timedelta - + FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"] - + args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2) } - + dag = DAG( dag_id='example_lineage', default_args=args, schedule_interval='0 0 * * *', dagrun_timeout=timedelta(minutes=60)) - + f_final = File("/tmp/final") - run_this_last = DummyOperator(task_id='run_this_last', dag=dag, + run_this_last = DummyOperator(task_id='run_this_last', dag=dag, inlets={"auto": True}, outlets={"datasets": [f_final,]}) - + f_in = File("/tmp/whole_directory/") outlets = [] for file in FILE_CATEGORIES: f_out = File("/tmp/{}/{{{{ execution_date }}}}".format(file)) outlets.append(f_out) - run_this = BashOperator( + run_this = BashOperator( task_id='run_me_first', bash_command='echo 1', dag=dag, inlets={"datasets": [f_in,]}, outlets={"datasets": outlets} @@ -49,25 +49,39 @@ works. Tasks take the parameters `inlets` and `outlets`. Inlets can be manually defined by a list of dataset `{"datasets": [dataset1, dataset2]}` or can be configured to look for outlets from upstream tasks `{"task_ids": ["task_id1", "task_id2"]}` -or can be configured to pick up outlets from direct upstream tasks `{"auto": True}` or a combination of them. Outlets -are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any fields for the dataset are templated with -the context when the task is being executed. +or can be configured to pick up outlets from direct upstream tasks `{"auto": True}` or a combination of them. Outlets +are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any fields for the dataset are templated with +the context when the task is being executed. .. note:: Operators can add inlets and outlets automatically if the operator supports it. -In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: `CAT1`, `CAT2`, `CAT3`, that are +In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: `CAT1`, `CAT2`, `CAT3`, that are generated from a list. Note that `execution_date` is a templated field and will be rendered when the task is running. .. note:: Behind the scenes Airflow prepares the lineage metadata as part of the `pre_execute` method of a task. When the task - has finished execution `post_execute` is called and lineage metadata is pushed into XCOM. Thus if you are creating + has finished execution `post_execute` is called and lineage metadata is pushed into XCOM. Thus if you are creating your own operators that override this method make sure to decorate your method with `prepare_lineage` and `apply_lineage` respectively. +Templating +---------- + +Inlets and outlets are available in templated variables by referring to their (qualified) name. If their name collides +with one of the built-in names (ie. 'dag') the built-in name takes precedence and the inlets or outlets will only be +available by using `{{ inlet['name'] }}` in your template. + +.. code:: python + + my_file = File(name="my_file", "my_file.{{ ds }}") + run_this = BashOperator( + task_id='run_me_first', bash_command="echo {{ my_file.name }}; echo {{ inlet['myfile'].name }}", dag=dag, + inlets={"datasets": [my_file,]} + ) Apache Atlas ------------ -Airflow can send its lineage metadata to Apache Atlas. You need to enable the `atlas` backend and configure it +Airflow can send its lineage metadata to Apache Atlas. You need to enable the `atlas` backend and configure it properly, e.g. in your `airflow.cfg`: .. code:: python @@ -80,6 +94,6 @@ properly, e.g. in your `airflow.cfg`: password = my_password host = host port = 21000 - + Please make sure to have the `atlasclient` package installed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services