[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-16 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r367599292
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -234,8 +234,9 @@ def test_dagrun_deadlock(self):
 ti_op2.set_state(state=State.NONE, session=session)
 
 dr.update_state()
-self.assertEqual(dr.state, State.RUNNING)
+self.assertEqual(dr.state, State.FAILED)
 
+dr.set_state(State.RUNNING)
 
 Review comment:
   The only thing I wondered about was whether it should be "upstream_failed" 
instead of "skipped" 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-16 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r367551483
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -234,8 +234,9 @@ def test_dagrun_deadlock(self):
 ti_op2.set_state(state=State.NONE, session=session)
 
 dr.update_state()
-self.assertEqual(dr.state, State.RUNNING)
+self.assertEqual(dr.state, State.FAILED)
 
 Review comment:
   Ok I found the reason. what happens here is that the later task state 
changes to be "skipped" (which is a bug in my opinion it should be "upstream 
failed") anyway I deleted this part where it checks if a task state has changed 
- this is a mistake,  I will return it now.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-13 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r366190049
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,11 +34,30 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+@provide_session
+def _get_states_count_upstream_ti(ti, finished_tasks, session):
+"""
+This function returns the states of the upstream tis for a specific ti 
in order to determine
+whether this ti can run in this iteration
+
+:param ti: the ti that we want to calculate deps for
+:type ti: airflow.models.TaskInstance
+:param finished_tasks: all the finished tasks of the dag_run
+:type finished_tasks: list[airflow.models.TaskInstance]
+"""
+if not finished_tasks:
 
 Review comment:
   Good catch @evgenyshulman!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-11 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r365532675
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self):
start_date=now)
 
 ti_op1 = dr.get_task_instance(task_id=op1.task_id)
-ti_op1.set_state(state=State.SUCCESS, session=session)
+ti_op1.set_state(state=State.FAILED, session=session)
 
 Review comment:
   You are right, But it means the dr should have been 'failed' instead of 
'running'. I will change it the task to 'success' and the state of the dagrun 
to 'failed' and then it will be as intended.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-09 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364801917
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -346,7 +331,38 @@ def update_state(self, session=None):
 session.merge(self)
 session.commit()
 
-return self.state
+return ready_tis
+
+@provide_session
+def get_ready_tis(self, session, scheduleable_tasks, finished_tasks):
+ready_tis = []
+for st in scheduleable_tasks:
+if st.are_dependencies_met(
+dep_context=DepContext(
+flag_upstream_failed=True,
+finished_tasks=finished_tasks),
+session=session):
+ready_tis.append(st)
+return ready_tis
+
+@provide_session
+def are_runnable_tis(self, session, unfinished_tasks, finished_tasks):
+# this is an optimization to avoid running on tasks that are not ready 
twice
+not_ready_tis = []
+# there might be runnable tasks that are up for retry and from some 
reason(retry delay, etc) are
+# not ready yet so we set the flags to count them in
+for ut in unfinished_tasks:
+if ut.are_dependencies_met(
+dep_context=DepContext(
+flag_upstream_failed=True,
+ignore_in_retry_period=True,
+ignore_in_reschedule_period=True,
+finished_tasks=finished_tasks),
+session=session):
+return False, not_ready_tis
 
 Review comment:
   Ok, I believe we can also unite the whole thing in one method say 
'update_tis' etc  which will iterate on all tasks regardless. In any case 
'get_ready_tis' does loop on all the tasks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364300910
 
 

 ##
 File path: tests/models/test_dagrun.py
 ##
 @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self):
start_date=now)
 
 ti_op1 = dr.get_task_instance(task_id=op1.task_id)
-ti_op1.set_state(state=State.SUCCESS, session=session)
+ti_op1.set_state(state=State.FAILED, session=session)
 
 Review comment:
   The trigger rule was defined to be one_failed which means (surprisingly) 
that one task has to fail. what happened before the change is: It resulted in 
deadlock right away and failed the dag which supposed to be in 'running' state. 
   
   Why did it work before the refactor?
   probably because it used another session and took the data from the db which 
said that the task is 'running' or 'none' 
   
   now I might not understand this test fully so please correct me if you think 
it is not the case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364170363
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -263,48 +262,34 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
+:param finished_tasks: The finished tasks of this run
+:type finished_tasks: list[airflow.models.TaskInstance]
 :return: State
 """
 
 dag = self.get_dag()
-
-tis = self.get_task_instances(session=session)
-self.log.debug("Updating state for %s considering %s task(s)", self, 
len(tis))
+ready_tis = []
+tis = [ti for ti in self.get_task_instances(session=session) if 
ti.state != State.REMOVED]
 
 Review comment:
   I didn't try larger dags but I believe that the impact will be huge


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2020-01-08 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r364169818
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -263,48 +262,34 @@ def update_state(self, session=None):
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
+:param finished_tasks: The finished tasks of this run
+:type finished_tasks: list[airflow.models.TaskInstance]
 :return: State
 """
 
 dag = self.get_dag()
-
-tis = self.get_task_instances(session=session)
-self.log.debug("Updating state for %s considering %s task(s)", self, 
len(tis))
+ready_tis = []
+tis = [ti for ti in self.get_task_instances(session=session) if 
ti.state != State.REMOVED]
 
 Review comment:
   No I didn't notice that, do you think it has something to do with this 
changes?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-24 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r349927589
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 if run.state == State.RUNNING:
 make_transient(run)
 active_dag_runs.append(run)
-
-for run in active_dag_runs:
-self.log.debug("Examining active DAG run: %s", run)
-tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
-
-# this loop is quite slow as it uses are_dependencies_met for
-# every task (in ti.is_runnable). This is also called in
-# update_state above which has already checked these tasks
-for ti in tis:
-task = dag.get_task(ti.task_id)
-
-# fixme: ti.task is transient but needs to be set
-ti.task = task
-
-if ti.are_dependencies_met(
-dep_context=DepContext(flag_upstream_failed=True),
-session=session):
+self.log.debug("Examining active DAG run: %s", run)
+for ti in run.ready_tis:
 
 Review comment:
   yes :)
   However just few tests use it for the state and it is kind of an easy change


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-19 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r348080871
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 if run.state == State.RUNNING:
 make_transient(run)
 active_dag_runs.append(run)
-
-for run in active_dag_runs:
-self.log.debug("Examining active DAG run: %s", run)
-tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
-
-# this loop is quite slow as it uses are_dependencies_met for
-# every task (in ti.is_runnable). This is also called in
-# update_state above which has already checked these tasks
-for ti in tis:
-task = dag.get_task(ti.task_id)
-
-# fixme: ti.task is transient but needs to be set
-ti.task = task
-
-if ti.are_dependencies_met(
-dep_context=DepContext(flag_upstream_failed=True),
-session=session):
+self.log.debug("Examining active DAG run: %s", run)
+for ti in run.ready_tis:
 
 Review comment:
   I did It, the problem is there are plenty of tests that uses the output of 
dr.update_state() so I will have to change them to take state from the object


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-19 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r348077144
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 if run.state == State.RUNNING:
 make_transient(run)
 active_dag_runs.append(run)
-
-for run in active_dag_runs:
-self.log.debug("Examining active DAG run: %s", run)
-tis = run.get_task_instances(state=SCHEDULEABLE_STATES)
-
-# this loop is quite slow as it uses are_dependencies_met for
-# every task (in ti.is_runnable). This is also called in
-# update_state above which has already checked these tasks
-for ti in tis:
-task = dag.get_task(ti.task_id)
-
-# fixme: ti.task is transient but needs to be set
-ti.task = task
-
-if ti.are_dependencies_met(
 
 Review comment:
   It did change the behaviour, It could have fail tasks that were up_for_retry 
and premature. I made another iteration trying to keep the same logic but a bit 
clearer and cleaner . Please review!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347493137
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -49,33 +75,33 @@ def _get_dep_statuses(self, ti, session, dep_context):
 yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
 return
 
-# TODO(unknown): this query becomes quite expensive with dags that 
have many
-# tasks. It should be refactored to let the task report to the dag run 
and get the
-# aggregates from there.
-qry = (
-session
-.query(
-func.coalesce(func.sum(
-case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 
0),
-func.count(TI.task_id),
+if dep_context.finished_tasks is None:
 
 Review comment:
   Ok I checked it, I suggest a change that will solve this and the following 
comment concerning "dag_run_finished_ti_map" collection. I think it will be 
both cleaner and more efficient.
   LMK what you think about this direction and will arrange it a bit better if 
you think its good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347492605
 
 

 ##
 File path: airflow/ti_deps/dep_context.py
 ##
 @@ -67,6 +67,8 @@ class DepContext:
 :type ignore_task_deps: bool
 :param ignore_ti_state: Ignore the task instance's previous failure/success
 :type ignore_ti_state: bool
+:param finished_tasks: A list of all the finished tasks of this run
+:type finished_tasks: list of airflow.models.TaskInstance
 
 Review comment:
   Ok I checked it, I suggest a change that will solve this and the following 
comment concerning "dag_run_finished_ti_map" collection. I think it will be 
both cleaner and more efficient.
LMK what you think about this direction and will arrange it a bit better if 
you think its good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347492605
 
 

 ##
 File path: airflow/ti_deps/dep_context.py
 ##
 @@ -67,6 +67,8 @@ class DepContext:
 :type ignore_task_deps: bool
 :param ignore_ti_state: Ignore the task instance's previous failure/success
 :type ignore_ti_state: bool
+:param finished_tasks: A list of all the finished tasks of this run
+:type finished_tasks: list of airflow.models.TaskInstance
 
 Review comment:
   Ok I checked it, I suggest a change that will solve this and the following 
comment concerning "dag_run_finished_ti_map" collection. I think it will be 
both cleaner and more efficient.
LMK what you think about this direction and will arrange it a bit better if 
you think its good.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-18 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347270280
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -717,7 +718,10 @@ def _process_task_instances(self, dag, 
task_instances_list, session=None):
 run.dag = dag
 # todo: preferably the integrity check happens at dag collection 
time
 run.verify_integrity(session=session)
-run.update_state(session=session)
+finished_tasks = run.get_task_instances(state=State.finished() + 
[State.UPSTREAM_FAILED],
+session=session)
 
 Review comment:
   Yes I think it worth it, this query happens a lot.. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-17 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347145094
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -49,33 +75,33 @@ def _get_dep_statuses(self, ti, session, dep_context):
 yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
 return
 
-# TODO(unknown): this query becomes quite expensive with dags that 
have many
-# tasks. It should be refactored to let the task report to the dag run 
and get the
-# aggregates from there.
-qry = (
-session
-.query(
-func.coalesce(func.sum(
-case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 
0),
-func.count(TI.task_id),
+if dep_context.finished_tasks is None:
 
 Review comment:
   I will check this, I agree 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-17 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r347145103
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -34,6 +35,31 @@ class TriggerRuleDep(BaseTIDep):
 IGNOREABLE = True
 IS_TASK_DEP = True
 
+@staticmethod
+def _get_states_count_upstream_ti(ti, finished_tasks):
+"""
+:param ti the ti that we want to calculate deps for
+:type ti airflow.models.TaskInstance
+:param finished_tasks all the finished tasks of the dag_run
+:type finished_tasks of finished ti's
+"""
+successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0
+upstream_tasks = [finished_task for finished_task in finished_tasks
+  if finished_task.task_id in 
ti.task.upstream_task_ids]
+if upstream_tasks:
+upstream_tasks_sorted = sorted(upstream_tasks, key=lambda x: 
x.state)
+for k, g in groupby(upstream_tasks_sorted, key=lambda x: x.state):
+if k == State.SUCCESS:
+successes = len(list(g))
+elif k == State.SKIPPED:
+skipped = len(list(g))
+elif k == State.FAILED:
+failed = len(list(g))
+elif k == State.UPSTREAM_FAILED:
+upstream_failed = len(list(g))
 
 Review comment:
   Thanks, sounds neat I'l give it a shot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-13 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r345693428
 
 

 ##
 File path: airflow/utils/state.py
 ##
 @@ -99,6 +99,7 @@ def finished(cls):
 cls.SUCCESS,
 cls.FAILED,
 cls.SKIPPED,
+cls.UPSTREAM_FAILED
 
 Review comment:
   Great, I will change it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-11 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r344499843
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -246,11 +246,12 @@ def get_previous_scheduled_dagrun(self, session=None):
 ).first()
 
 @provide_session
-def update_state(self, session=None):
+def update_state(self, session=None, finished_tasks=None):
 
 Review comment:
   Thanks, I will.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-11 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r344499688
 
 

 ##
 File path: tests/ti_deps/deps/test_trigger_rule_dep.py
 ##
 @@ -361,3 +366,55 @@ def test_unknown_tr(self):
 
 self.assertEqual(len(dep_statuses), 1)
 self.assertFalse(dep_statuses[0].passed)
+
+def test_get_states_count_upstream_ti(self):
+"""
+this test tests the helper function '_get_states_count_upstream_ti' as 
a unit and inside update_state
+"""
+get_states_count_upstream_ti = 
TriggerRuleDep._get_states_count_upstream_ti
+session = settings.Session()
+now = timezone.utcnow()
+dag = DAG(
+'test_dagrun_with_pre_tis',
+start_date=DEFAULT_DATE,
+default_args={'owner': 'owner1'})
+
+with dag:
+op1 = DummyOperator(task_id='A')
+op2 = DummyOperator(task_id='B')
+op3 = DummyOperator(task_id='C')
+op4 = DummyOperator(task_id='D')
+op5 = DummyOperator(task_id='E', 
trigger_rule=TriggerRule.ONE_FAILED)
+
+op1.set_downstream([op2, op3])  # op1 >> op2, op3
+op4.set_upstream([op3, op2])  # op3 >> op4
 
 Review comment:
   Thanks I will fix it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-11-11 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r344499197
 
 

 ##
 File path: airflow/utils/state.py
 ##
 @@ -99,6 +99,7 @@ def finished(cls):
 cls.SUCCESS,
 cls.FAILED,
 cls.SKIPPED,
+cls.UPSTREAM_FAILED
 
 Review comment:
   Are you sure it doesn't make sense? 
   It makes sense to me in a meaning that this is a task which its state is 
already set, and in most places it is considered exactly as failed . I searched 
for occurrences in the code that might challenge this perception and I didn't 
find any. 
   Anyway if you think it's better I can change it back and not use 
state.finished() it in my function.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-07-10 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r302166819
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context):
 yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
 return
 
-# TODO(unknown): this query becomes quite expensive with dags that 
have many
-# tasks. It should be refactored to let the task report to the dag run 
and get the
-# aggregates from there.
-qry = (
-session
-.query(
-func.coalesce(func.sum(
-case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 
0),
-func.count(TI.task_id),
+successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0
+if dep_context.finished_tasks is None:
+qry = (
+session
+.query(
+func.coalesce(func.sum(
+case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.FAILED, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.UPSTREAM_FAILED, 1)], 
else_=0)), 0),
+func.count(TI.task_id),
+)
+.filter(
+TI.dag_id == ti.dag_id,
+TI.task_id.in_(ti.task.upstream_task_ids),
+TI.execution_date == ti.execution_date,
+TI.state.in_(State.finished()),
+)
 )
-.filter(
-TI.dag_id == ti.dag_id,
-TI.task_id.in_(ti.task.upstream_task_ids),
-TI.execution_date == ti.execution_date,
-TI.state.in_([
-State.SUCCESS, State.FAILED,
-State.UPSTREAM_FAILED, State.SKIPPED]),
-)
-)
+successes, skipped, failed, upstream_failed, done = qry.first()
+else:
+# see if the task name is in the task upstream for our task
+upstream_tasks = [finished_task for finished_task in 
dep_context.finished_tasks
+  if finished_task.task_id in 
ti.task.upstream_task_ids]
+if upstream_tasks:
+upstream_tasks_sorted = sorted(upstream_tasks, key=lambda x: 
x.state)
+for k, g in groupby(upstream_tasks_sorted, key=lambda x: 
x.state):
 
 Review comment:
   I will add a few unit tests for it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-07-10 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r302166463
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context):
 yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
 return
 
-# TODO(unknown): this query becomes quite expensive with dags that 
have many
-# tasks. It should be refactored to let the task report to the dag run 
and get the
-# aggregates from there.
-qry = (
-session
-.query(
-func.coalesce(func.sum(
-case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 
0),
-func.count(TI.task_id),
+successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0
+if dep_context.finished_tasks is None:
+qry = (
+session
+.query(
+func.coalesce(func.sum(
+case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.FAILED, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.UPSTREAM_FAILED, 1)], 
else_=0)), 0),
+func.count(TI.task_id),
+)
+.filter(
+TI.dag_id == ti.dag_id,
+TI.task_id.in_(ti.task.upstream_task_ids),
+TI.execution_date == ti.execution_date,
+TI.state.in_(State.finished()),
+)
 )
-.filter(
-TI.dag_id == ti.dag_id,
-TI.task_id.in_(ti.task.upstream_task_ids),
-TI.execution_date == ti.execution_date,
-TI.state.in_([
-State.SUCCESS, State.FAILED,
-State.UPSTREAM_FAILED, State.SKIPPED]),
-)
-)
+successes, skipped, failed, upstream_failed, done = qry.first()
+else:
+# see if the task name is in the task upstream for our task
+upstream_tasks = [finished_task for finished_task in 
dep_context.finished_tasks
+  if finished_task.task_id in 
ti.task.upstream_task_ids]
+if upstream_tasks:
 
 Review comment:
   yes, because we need to share sql results for more than one purpose


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-07-10 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r302166108
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context):
 yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
 return
 
-# TODO(unknown): this query becomes quite expensive with dags that 
have many
-# tasks. It should be refactored to let the task report to the dag run 
and get the
-# aggregates from there.
 
 Review comment:
   yes,  it does the query once for every dagrun instead of multiply it with 
tasks. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-03-03 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r261877321
 
 

 ##
 File path: airflow/jobs.py
 ##
 @@ -930,7 +932,20 @@ def _process_task_instances(self, dag, queue, 
session=None):
 run.dag = dag
 # todo: preferably the integrity check happens at dag collection 
time
 run.verify_integrity(session=session)
-run.update_state(session=session)
+finished_tasks = (
+session
+.query(TI.task_id, TI.state
+   )
+.filter(
+TI.dag_id == run.dag_id,
+TI.execution_date == run.execution_date,
 
 Review comment:
   Yes, I will fix it to be `run.get_task_instances(state=State.finished(), 
session=session)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-03-03 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r261877204
 
 

 ##
 File path: airflow/jobs.py
 ##
 @@ -930,7 +932,20 @@ def _process_task_instances(self, dag, queue, 
session=None):
 run.dag = dag
 # todo: preferably the integrity check happens at dag collection 
time
 run.verify_integrity(session=session)
-run.update_state(session=session)
+finished_tasks = (
+session
+.query(TI.task_id, TI.state
+   )
+.filter(
+TI.dag_id == run.dag_id,
+TI.execution_date == run.execution_date,
+TI.state.in_([
+State.SUCCESS, State.FAILED,
+State.UPSTREAM_FAILED, State.SKIPPED])
 
 Review comment:
   It seems that it should include also UPSTREAM_FAILED, the behaviour dealing 
with finished tasks is quiet similar in most cases, I will run the tests and we 
will see :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-03-03 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r261877155
 
 

 ##
 File path: airflow/ti_deps/deps/trigger_rule_dep.py
 ##
 @@ -49,33 +49,49 @@ def _get_dep_statuses(self, ti, session, dep_context):
 yield self._passing_status(reason="The task had a dummy trigger 
rule set.")
 return
 
-# TODO(unknown): this query becomes quite expensive with dags that 
have many
-# tasks. It should be refactored to let the task report to the dag run 
and get the
-# aggregates from there.
-qry = (
-session
-.query(
-func.coalesce(func.sum(
-case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.FAILED, 1)], else_=0)), 0),
-func.coalesce(func.sum(
-case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 
0),
-func.count(TI.task_id),
+successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0
+if dep_context.finished_tasks is None:
+qry = (
+session
+.query(
+func.coalesce(func.sum(
+case([(TI.state == State.SUCCESS, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.SKIPPED, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.FAILED, 1)], else_=0)), 0),
+func.coalesce(func.sum(
+case([(TI.state == State.UPSTREAM_FAILED, 1)], 
else_=0)), 0),
+func.count(TI.task_id),
 
 Review comment:
   Yes, almost the same here, I will add UPSTREAM_FAILED and we will see what 
happen


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run

2019-03-03 Thread GitBox
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected 
trigger rule dep check per dag run
URL: https://github.com/apache/airflow/pull/4751#discussion_r261877174
 
 

 ##
 File path: airflow/models/__init__.py
 ##
 @@ -4872,11 +4872,12 @@ def get_previous_scheduled_dagrun(self, session=None):
 ).first()
 
 @provide_session
-def update_state(self, session=None):
+def update_state(self, session=None, finished_tasks=None):
 """
 Determines the overall state of the DagRun based on the state
 of its TaskInstances.
 
+:param finished_tasks: The finished tasks collected ordered by dagrun 
as a column (task_name, state)
 
 Review comment:
   It is necessary for the dependency check of all unfinished tasks(4915), and 
we have it already so why not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services