[
https://issues.apache.org/jira/browse/AIRFLOW-2554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725287#comment-16725287
]
ASF GitHub Bot commented on AIRFLOW-2554:
-
stale[bot] closed pull request #3453: [AIRFLOW-2554] Enable convenience access
to in/outlets in templates
URL: https://github.com/apache/incubator-airflow/pull/3453
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index eda480832b..0f9e1eb131 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1808,6 +1808,10 @@ def get_template_context(self, session=None):
session.expunge_all()
session.commit()
+# create convenience names for inlets and outlets
+inlets = dict((i.qualified_name, i) for i in task.inlets)
+outlets = dict((o.qualified_name, o) for o in task.outlets)
+
if task.params:
params.update(task.params)
@@ -1844,7 +1848,9 @@ def __getattr__(self, item):
def __repr__(self):
return str(self.var)
-return {
+# make sure dag level overwrite inlets/outlets
+context = dict(inlets, **outlets)
+context.update({
'dag': task.dag,
'ds': ds,
'next_ds': next_ds,
@@ -1877,9 +1883,11 @@ def __repr__(self):
'value': VariableAccessor(),
'json': VariableJsonAccessor()
},
-'inlets': task.inlets,
-'outlets': task.outlets,
-}
+'inlets': inlets,
+'outlets': outlets,
+})
+
+return context
def overwrite_params_with_dag_run_conf(self, params, dag_run):
if dag_run and dag_run.conf:
diff --git a/docs/lineage.rst b/docs/lineage.rst
index 719ef0115e..3a9c87e4ff 100644
--- a/docs/lineage.rst
+++ b/docs/lineage.rst
@@ -16,30 +16,30 @@ works.
from airflow.lineage.datasets import File
from airflow.models import DAG
from datetime import timedelta
-
+
FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
-
+
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
-
+
dag = DAG(
dag_id='example_lineage', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
-
+
f_final = File("/tmp/final")
-run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
+run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
inlets={"auto": True},
outlets={"datasets": [f_final,]})
-
+
f_in = File("/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
f_out = File("/tmp/{}/ execution_date ".format(file))
outlets.append(f_out)
-run_this = BashOperator(
+run_this = BashOperator(
task_id='run_me_first', bash_command='echo 1', dag=dag,
inlets={"datasets": [f_in,]},
outlets={"datasets": outlets}
@@ -49,25 +49,39 @@ works.
Tasks take the parameters `inlets` and `outlets`. Inlets can be manually
defined by a list of dataset `{"datasets":
[dataset1, dataset2]}` or can be configured to look for outlets from upstream
tasks `{"task_ids": ["task_id1", "task_id2"]}`
-or can be configured to pick up outlets from direct upstream tasks `{"auto":
True}` or a combination of them. Outlets
-are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any
fields for the dataset are templated with
-the context when the task is being executed.
+or can be configured to pick up outlets from direct upstream tasks `{"auto":
True}` or a combination of them. Outlets
+are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any
fields for the dataset are templated with
+the context when the task is being executed.
.. note:: Operators can add inlets and outlets automatically if the operator
supports it.
-In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets:
`CAT1`, `CAT2`, `CAT3`, that are
+In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets:
`CAT1`, `CAT2`, `CAT3`, that are
generated from a list. Note that `execution_date` is a templated field and
will be rendered when the task is running.
.. note:: Behind the scenes Airflow prepares the lineage metadata as part of
the `pre_execute` method of a task. When the task
- has finished execution `post_execute` is called and lineage metadata
is pushed into XCOM. Thus if you are creating
+ has finished execution `post_execute` is called and lineage metadata
is pushed into XCOM. Thus if you are creating
your own