[jira] [Commented] (AIRFLOW-2554) Inlets and outlets should be availabe in templates by their fully_qualified name or name

2018-12-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725287#comment-16725287
 ] 

ASF GitHub Bot commented on AIRFLOW-2554:
-

stale[bot] closed pull request #3453: [AIRFLOW-2554] Enable convenience access 
to in/outlets in templates
URL: https://github.com/apache/incubator-airflow/pull/3453
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index eda480832b..0f9e1eb131 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1808,6 +1808,10 @@ def get_template_context(self, session=None):
 session.expunge_all()
 session.commit()
 
+# create convenience names for inlets and outlets
+inlets = dict((i.qualified_name, i) for i in task.inlets)
+outlets = dict((o.qualified_name, o) for o in task.outlets)
+
 if task.params:
 params.update(task.params)
 
@@ -1844,7 +1848,9 @@ def __getattr__(self, item):
 def __repr__(self):
 return str(self.var)
 
-return {
+# make sure dag level overwrite inlets/outlets
+context = dict(inlets, **outlets)
+context.update({
 'dag': task.dag,
 'ds': ds,
 'next_ds': next_ds,
@@ -1877,9 +1883,11 @@ def __repr__(self):
 'value': VariableAccessor(),
 'json': VariableJsonAccessor()
 },
-'inlets': task.inlets,
-'outlets': task.outlets,
-}
+'inlets': inlets,
+'outlets': outlets,
+})
+
+return context
 
 def overwrite_params_with_dag_run_conf(self, params, dag_run):
 if dag_run and dag_run.conf:
diff --git a/docs/lineage.rst b/docs/lineage.rst
index 719ef0115e..3a9c87e4ff 100644
--- a/docs/lineage.rst
+++ b/docs/lineage.rst
@@ -16,30 +16,30 @@ works.
 from airflow.lineage.datasets import File
 from airflow.models import DAG
 from datetime import timedelta
-
+
 FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"]
-
+
 args = {
 'owner': 'airflow',
 'start_date': airflow.utils.dates.days_ago(2)
 }
-
+
 dag = DAG(
 dag_id='example_lineage', default_args=args,
 schedule_interval='0 0 * * *',
 dagrun_timeout=timedelta(minutes=60))
-
+
 f_final = File("/tmp/final")
-run_this_last = DummyOperator(task_id='run_this_last', dag=dag, 
+run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
 inlets={"auto": True},
 outlets={"datasets": [f_final,]})
-
+
 f_in = File("/tmp/whole_directory/")
 outlets = []
 for file in FILE_CATEGORIES:
 f_out = File("/tmp/{}/ execution_date ".format(file))
 outlets.append(f_out)
-run_this = BashOperator(
+run_this = BashOperator(
 task_id='run_me_first', bash_command='echo 1', dag=dag,
 inlets={"datasets": [f_in,]},
 outlets={"datasets": outlets}
@@ -49,25 +49,39 @@ works.
 
 Tasks take the parameters `inlets` and `outlets`. Inlets can be manually 
defined by a list of dataset `{"datasets":
 [dataset1, dataset2]}` or can be configured to look for outlets from upstream 
tasks `{"task_ids": ["task_id1", "task_id2"]}`
-or can be configured to pick up outlets from direct upstream tasks `{"auto": 
True}` or a combination of them. Outlets 
-are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any 
fields for the dataset are templated with 
-the context when the task is being executed. 
+or can be configured to pick up outlets from direct upstream tasks `{"auto": 
True}` or a combination of them. Outlets
+are defined as list of dataset `{"datasets": [dataset1, dataset2]}`. Any 
fields for the dataset are templated with
+the context when the task is being executed.
 
 .. note:: Operators can add inlets and outlets automatically if the operator 
supports it.
 
-In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: 
`CAT1`, `CAT2`, `CAT3`, that are 
+In the example DAG task `run_me_first` is a BashOperator that takes 3 inlets: 
`CAT1`, `CAT2`, `CAT3`, that are
 generated from a list. Note that `execution_date` is a templated field and 
will be rendered when the task is running.
 
 .. note:: Behind the scenes Airflow prepares the lineage metadata as part of 
the `pre_execute` method of a task. When the task
-  has finished execution `post_execute` is called and lineage metadata 
is pushed into XCOM. Thus if you are creating 
+  has finished execution `post_execute` is called and lineage metadata 
is pushed into XCOM. Thus if you are creating
   your own 

[jira] [Commented] (AIRFLOW-2554) Inlets and outlets should be availabe in templates by their fully_qualified name or name

2018-09-02 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601446#comment-16601446
 ] 

Apache Spark commented on AIRFLOW-2554:
---

User 'bolkedebruin' has created a pull request for this issue:
https://github.com/apache/incubator-airflow/pull/3453

> Inlets and outlets should be availabe in templates by their fully_qualified 
> name or name
> 
>
> Key: AIRFLOW-2554
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2554
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)