[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r367599292 ## File path: tests/models/test_dagrun.py ## @@ -234,8 +234,9 @@ def test_dagrun_deadlock(self): ti_op2.set_state(state=State.NONE, session=session) dr.update_state() -self.assertEqual(dr.state, State.RUNNING) +self.assertEqual(dr.state, State.FAILED) +dr.set_state(State.RUNNING) Review comment: The only thing I wondered about was whether it should be "upstream_failed" instead of "skipped" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r367551483 ## File path: tests/models/test_dagrun.py ## @@ -234,8 +234,9 @@ def test_dagrun_deadlock(self): ti_op2.set_state(state=State.NONE, session=session) dr.update_state() -self.assertEqual(dr.state, State.RUNNING) +self.assertEqual(dr.state, State.FAILED) Review comment: Ok I found the reason. what happens here is that the later task state changes to be "skipped" (which is a bug in my opinion it should be "upstream failed") anyway I deleted this part where it checks if a task state has changed - this is a mistake, I will return it now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r366190049 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -34,11 +34,30 @@ class TriggerRuleDep(BaseTIDep): IGNOREABLE = True IS_TASK_DEP = True +@staticmethod +@provide_session +def _get_states_count_upstream_ti(ti, finished_tasks, session): +""" +This function returns the states of the upstream tis for a specific ti in order to determine +whether this ti can run in this iteration + +:param ti: the ti that we want to calculate deps for +:type ti: airflow.models.TaskInstance +:param finished_tasks: all the finished tasks of the dag_run +:type finished_tasks: list[airflow.models.TaskInstance] +""" +if not finished_tasks: Review comment: Good catch @evgenyshulman! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r365532675 ## File path: tests/models/test_dagrun.py ## @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self): start_date=now) ti_op1 = dr.get_task_instance(task_id=op1.task_id) -ti_op1.set_state(state=State.SUCCESS, session=session) +ti_op1.set_state(state=State.FAILED, session=session) Review comment: You are right, But it means the dr should have been 'failed' instead of 'running'. I will change it the task to 'success' and the state of the dagrun to 'failed' and then it will be as intended. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r364801917 ## File path: airflow/models/dagrun.py ## @@ -346,7 +331,38 @@ def update_state(self, session=None): session.merge(self) session.commit() -return self.state +return ready_tis + +@provide_session +def get_ready_tis(self, session, scheduleable_tasks, finished_tasks): +ready_tis = [] +for st in scheduleable_tasks: +if st.are_dependencies_met( +dep_context=DepContext( +flag_upstream_failed=True, +finished_tasks=finished_tasks), +session=session): +ready_tis.append(st) +return ready_tis + +@provide_session +def are_runnable_tis(self, session, unfinished_tasks, finished_tasks): +# this is an optimization to avoid running on tasks that are not ready twice +not_ready_tis = [] +# there might be runnable tasks that are up for retry and from some reason(retry delay, etc) are +# not ready yet so we set the flags to count them in +for ut in unfinished_tasks: +if ut.are_dependencies_met( +dep_context=DepContext( +flag_upstream_failed=True, +ignore_in_retry_period=True, +ignore_in_reschedule_period=True, +finished_tasks=finished_tasks), +session=session): +return False, not_ready_tis Review comment: Ok, I believe we can also unite the whole thing in one method say 'update_tis' etc which will iterate on all tasks regardless. In any case 'get_ready_tis' does loop on all the tasks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r364300910 ## File path: tests/models/test_dagrun.py ## @@ -229,7 +229,7 @@ def test_dagrun_deadlock(self): start_date=now) ti_op1 = dr.get_task_instance(task_id=op1.task_id) -ti_op1.set_state(state=State.SUCCESS, session=session) +ti_op1.set_state(state=State.FAILED, session=session) Review comment: The trigger rule was defined to be one_failed which means (surprisingly) that one task has to fail. what happened before the change is: It resulted in deadlock right away and failed the dag which supposed to be in 'running' state. Why did it work before the refactor? probably because it used another session and took the data from the db which said that the task is 'running' or 'none' now I might not understand this test fully so please correct me if you think it is not the case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r364170363 ## File path: airflow/models/dagrun.py ## @@ -263,48 +262,34 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. +:param finished_tasks: The finished tasks of this run +:type finished_tasks: list[airflow.models.TaskInstance] :return: State """ dag = self.get_dag() - -tis = self.get_task_instances(session=session) -self.log.debug("Updating state for %s considering %s task(s)", self, len(tis)) +ready_tis = [] +tis = [ti for ti in self.get_task_instances(session=session) if ti.state != State.REMOVED] Review comment: I didn't try larger dags but I believe that the impact will be huge This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r364169818 ## File path: airflow/models/dagrun.py ## @@ -263,48 +262,34 @@ def update_state(self, session=None): Determines the overall state of the DagRun based on the state of its TaskInstances. +:param finished_tasks: The finished tasks of this run +:type finished_tasks: list[airflow.models.TaskInstance] :return: State """ dag = self.get_dag() - -tis = self.get_task_instances(session=session) -self.log.debug("Updating state for %s considering %s task(s)", self, len(tis)) +ready_tis = [] +tis = [ti for ti in self.get_task_instances(session=session) if ti.state != State.REMOVED] Review comment: No I didn't notice that, do you think it has something to do with this changes? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r349927589 ## File path: airflow/jobs/scheduler_job.py ## @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, task_instances_list, session=None): if run.state == State.RUNNING: make_transient(run) active_dag_runs.append(run) - -for run in active_dag_runs: -self.log.debug("Examining active DAG run: %s", run) -tis = run.get_task_instances(state=SCHEDULEABLE_STATES) - -# this loop is quite slow as it uses are_dependencies_met for -# every task (in ti.is_runnable). This is also called in -# update_state above which has already checked these tasks -for ti in tis: -task = dag.get_task(ti.task_id) - -# fixme: ti.task is transient but needs to be set -ti.task = task - -if ti.are_dependencies_met( -dep_context=DepContext(flag_upstream_failed=True), -session=session): +self.log.debug("Examining active DAG run: %s", run) +for ti in run.ready_tis: Review comment: yes :) However just few tests use it for the state and it is kind of an easy change This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r348080871 ## File path: airflow/jobs/scheduler_job.py ## @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, task_instances_list, session=None): if run.state == State.RUNNING: make_transient(run) active_dag_runs.append(run) - -for run in active_dag_runs: -self.log.debug("Examining active DAG run: %s", run) -tis = run.get_task_instances(state=SCHEDULEABLE_STATES) - -# this loop is quite slow as it uses are_dependencies_met for -# every task (in ti.is_runnable). This is also called in -# update_state above which has already checked these tasks -for ti in tis: -task = dag.get_task(ti.task_id) - -# fixme: ti.task is transient but needs to be set -ti.task = task - -if ti.are_dependencies_met( -dep_context=DepContext(flag_upstream_failed=True), -session=session): +self.log.debug("Examining active DAG run: %s", run) +for ti in run.ready_tis: Review comment: I did It, the problem is there are plenty of tests that uses the output of dr.update_state() so I will have to change them to take state from the object This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r348077144 ## File path: airflow/jobs/scheduler_job.py ## @@ -721,25 +721,14 @@ def _process_task_instances(self, dag, task_instances_list, session=None): if run.state == State.RUNNING: make_transient(run) active_dag_runs.append(run) - -for run in active_dag_runs: -self.log.debug("Examining active DAG run: %s", run) -tis = run.get_task_instances(state=SCHEDULEABLE_STATES) - -# this loop is quite slow as it uses are_dependencies_met for -# every task (in ti.is_runnable). This is also called in -# update_state above which has already checked these tasks -for ti in tis: -task = dag.get_task(ti.task_id) - -# fixme: ti.task is transient but needs to be set -ti.task = task - -if ti.are_dependencies_met( Review comment: It did change the behaviour, It could have fail tasks that were up_for_retry and premature. I made another iteration trying to keep the same logic but a bit clearer and cleaner . Please review! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r347493137 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -49,33 +75,33 @@ def _get_dep_statuses(self, ti, session, dep_context): yield self._passing_status(reason="The task had a dummy trigger rule set.") return -# TODO(unknown): this query becomes quite expensive with dags that have many -# tasks. It should be refactored to let the task report to the dag run and get the -# aggregates from there. -qry = ( -session -.query( -func.coalesce(func.sum( -case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.FAILED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), -func.count(TI.task_id), +if dep_context.finished_tasks is None: Review comment: Ok I checked it, I suggest a change that will solve this and the following comment concerning "dag_run_finished_ti_map" collection. I think it will be both cleaner and more efficient. LMK what you think about this direction and will arrange it a bit better if you think its good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r347492605 ## File path: airflow/ti_deps/dep_context.py ## @@ -67,6 +67,8 @@ class DepContext: :type ignore_task_deps: bool :param ignore_ti_state: Ignore the task instance's previous failure/success :type ignore_ti_state: bool +:param finished_tasks: A list of all the finished tasks of this run +:type finished_tasks: list of airflow.models.TaskInstance Review comment: Ok I checked it, I suggest a change that will solve this and the following comment concerning "dag_run_finished_ti_map" collection. I think it will be both cleaner and more efficient. LMK what you think about this direction and will arrange it a bit better if you think its good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r347492605 ## File path: airflow/ti_deps/dep_context.py ## @@ -67,6 +67,8 @@ class DepContext: :type ignore_task_deps: bool :param ignore_ti_state: Ignore the task instance's previous failure/success :type ignore_ti_state: bool +:param finished_tasks: A list of all the finished tasks of this run +:type finished_tasks: list of airflow.models.TaskInstance Review comment: Ok I checked it, I suggest a change that will solve this and the following comment concerning "dag_run_finished_ti_map" collection. I think it will be both cleaner and more efficient. LMK what you think about this direction and will arrange it a bit better if you think its good. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r347270280 ## File path: airflow/jobs/scheduler_job.py ## @@ -717,7 +718,10 @@ def _process_task_instances(self, dag, task_instances_list, session=None): run.dag = dag # todo: preferably the integrity check happens at dag collection time run.verify_integrity(session=session) -run.update_state(session=session) +finished_tasks = run.get_task_instances(state=State.finished() + [State.UPSTREAM_FAILED], +session=session) Review comment: Yes I think it worth it, this query happens a lot.. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r347145094 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -49,33 +75,33 @@ def _get_dep_statuses(self, ti, session, dep_context): yield self._passing_status(reason="The task had a dummy trigger rule set.") return -# TODO(unknown): this query becomes quite expensive with dags that have many -# tasks. It should be refactored to let the task report to the dag run and get the -# aggregates from there. -qry = ( -session -.query( -func.coalesce(func.sum( -case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.FAILED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), -func.count(TI.task_id), +if dep_context.finished_tasks is None: Review comment: I will check this, I agree This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r347145103 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -34,6 +35,31 @@ class TriggerRuleDep(BaseTIDep): IGNOREABLE = True IS_TASK_DEP = True +@staticmethod +def _get_states_count_upstream_ti(ti, finished_tasks): +""" +:param ti the ti that we want to calculate deps for +:type ti airflow.models.TaskInstance +:param finished_tasks all the finished tasks of the dag_run +:type finished_tasks of finished ti's +""" +successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0 +upstream_tasks = [finished_task for finished_task in finished_tasks + if finished_task.task_id in ti.task.upstream_task_ids] +if upstream_tasks: +upstream_tasks_sorted = sorted(upstream_tasks, key=lambda x: x.state) +for k, g in groupby(upstream_tasks_sorted, key=lambda x: x.state): +if k == State.SUCCESS: +successes = len(list(g)) +elif k == State.SKIPPED: +skipped = len(list(g)) +elif k == State.FAILED: +failed = len(list(g)) +elif k == State.UPSTREAM_FAILED: +upstream_failed = len(list(g)) Review comment: Thanks, sounds neat I'l give it a shot. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r345693428 ## File path: airflow/utils/state.py ## @@ -99,6 +99,7 @@ def finished(cls): cls.SUCCESS, cls.FAILED, cls.SKIPPED, +cls.UPSTREAM_FAILED Review comment: Great, I will change it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r344499843 ## File path: airflow/models/dagrun.py ## @@ -246,11 +246,12 @@ def get_previous_scheduled_dagrun(self, session=None): ).first() @provide_session -def update_state(self, session=None): +def update_state(self, session=None, finished_tasks=None): Review comment: Thanks, I will. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r344499688 ## File path: tests/ti_deps/deps/test_trigger_rule_dep.py ## @@ -361,3 +366,55 @@ def test_unknown_tr(self): self.assertEqual(len(dep_statuses), 1) self.assertFalse(dep_statuses[0].passed) + +def test_get_states_count_upstream_ti(self): +""" +this test tests the helper function '_get_states_count_upstream_ti' as a unit and inside update_state +""" +get_states_count_upstream_ti = TriggerRuleDep._get_states_count_upstream_ti +session = settings.Session() +now = timezone.utcnow() +dag = DAG( +'test_dagrun_with_pre_tis', +start_date=DEFAULT_DATE, +default_args={'owner': 'owner1'}) + +with dag: +op1 = DummyOperator(task_id='A') +op2 = DummyOperator(task_id='B') +op3 = DummyOperator(task_id='C') +op4 = DummyOperator(task_id='D') +op5 = DummyOperator(task_id='E', trigger_rule=TriggerRule.ONE_FAILED) + +op1.set_downstream([op2, op3]) # op1 >> op2, op3 +op4.set_upstream([op3, op2]) # op3 >> op4 Review comment: Thanks I will fix it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r344499197 ## File path: airflow/utils/state.py ## @@ -99,6 +99,7 @@ def finished(cls): cls.SUCCESS, cls.FAILED, cls.SKIPPED, +cls.UPSTREAM_FAILED Review comment: Are you sure it doesn't make sense? It makes sense to me in a meaning that this is a task which its state is already set, and in most places it is considered exactly as failed . I searched for occurrences in the code that might challenge this perception and I didn't find any. Anyway if you think it's better I can change it back and not use state.finished() it in my function. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r302166819 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context): yield self._passing_status(reason="The task had a dummy trigger rule set.") return -# TODO(unknown): this query becomes quite expensive with dags that have many -# tasks. It should be refactored to let the task report to the dag run and get the -# aggregates from there. -qry = ( -session -.query( -func.coalesce(func.sum( -case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.FAILED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), -func.count(TI.task_id), +successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0 +if dep_context.finished_tasks is None: +qry = ( +session +.query( +func.coalesce(func.sum( +case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.FAILED, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), +func.count(TI.task_id), +) +.filter( +TI.dag_id == ti.dag_id, +TI.task_id.in_(ti.task.upstream_task_ids), +TI.execution_date == ti.execution_date, +TI.state.in_(State.finished()), +) ) -.filter( -TI.dag_id == ti.dag_id, -TI.task_id.in_(ti.task.upstream_task_ids), -TI.execution_date == ti.execution_date, -TI.state.in_([ -State.SUCCESS, State.FAILED, -State.UPSTREAM_FAILED, State.SKIPPED]), -) -) +successes, skipped, failed, upstream_failed, done = qry.first() +else: +# see if the task name is in the task upstream for our task +upstream_tasks = [finished_task for finished_task in dep_context.finished_tasks + if finished_task.task_id in ti.task.upstream_task_ids] +if upstream_tasks: +upstream_tasks_sorted = sorted(upstream_tasks, key=lambda x: x.state) +for k, g in groupby(upstream_tasks_sorted, key=lambda x: x.state): Review comment: I will add a few unit tests for it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r302166463 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context): yield self._passing_status(reason="The task had a dummy trigger rule set.") return -# TODO(unknown): this query becomes quite expensive with dags that have many -# tasks. It should be refactored to let the task report to the dag run and get the -# aggregates from there. -qry = ( -session -.query( -func.coalesce(func.sum( -case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.FAILED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), -func.count(TI.task_id), +successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0 +if dep_context.finished_tasks is None: +qry = ( +session +.query( +func.coalesce(func.sum( +case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.FAILED, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), +func.count(TI.task_id), +) +.filter( +TI.dag_id == ti.dag_id, +TI.task_id.in_(ti.task.upstream_task_ids), +TI.execution_date == ti.execution_date, +TI.state.in_(State.finished()), +) ) -.filter( -TI.dag_id == ti.dag_id, -TI.task_id.in_(ti.task.upstream_task_ids), -TI.execution_date == ti.execution_date, -TI.state.in_([ -State.SUCCESS, State.FAILED, -State.UPSTREAM_FAILED, State.SKIPPED]), -) -) +successes, skipped, failed, upstream_failed, done = qry.first() +else: +# see if the task name is in the task upstream for our task +upstream_tasks = [finished_task for finished_task in dep_context.finished_tasks + if finished_task.task_id in ti.task.upstream_task_ids] +if upstream_tasks: Review comment: yes, because we need to share sql results for more than one purpose This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r302166108 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -49,33 +50,46 @@ def _get_dep_statuses(self, ti, session, dep_context): yield self._passing_status(reason="The task had a dummy trigger rule set.") return -# TODO(unknown): this query becomes quite expensive with dags that have many -# tasks. It should be refactored to let the task report to the dag run and get the -# aggregates from there. Review comment: yes, it does the query once for every dagrun instead of multiply it with tasks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r261877321 ## File path: airflow/jobs.py ## @@ -930,7 +932,20 @@ def _process_task_instances(self, dag, queue, session=None): run.dag = dag # todo: preferably the integrity check happens at dag collection time run.verify_integrity(session=session) -run.update_state(session=session) +finished_tasks = ( +session +.query(TI.task_id, TI.state + ) +.filter( +TI.dag_id == run.dag_id, +TI.execution_date == run.execution_date, Review comment: Yes, I will fix it to be `run.get_task_instances(state=State.finished(), session=session)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r261877204 ## File path: airflow/jobs.py ## @@ -930,7 +932,20 @@ def _process_task_instances(self, dag, queue, session=None): run.dag = dag # todo: preferably the integrity check happens at dag collection time run.verify_integrity(session=session) -run.update_state(session=session) +finished_tasks = ( +session +.query(TI.task_id, TI.state + ) +.filter( +TI.dag_id == run.dag_id, +TI.execution_date == run.execution_date, +TI.state.in_([ +State.SUCCESS, State.FAILED, +State.UPSTREAM_FAILED, State.SKIPPED]) Review comment: It seems that it should include also UPSTREAM_FAILED, the behaviour dealing with finished tasks is quiet similar in most cases, I will run the tests and we will see :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r261877155 ## File path: airflow/ti_deps/deps/trigger_rule_dep.py ## @@ -49,33 +49,49 @@ def _get_dep_statuses(self, ti, session, dep_context): yield self._passing_status(reason="The task had a dummy trigger rule set.") return -# TODO(unknown): this query becomes quite expensive with dags that have many -# tasks. It should be refactored to let the task report to the dag run and get the -# aggregates from there. -qry = ( -session -.query( -func.coalesce(func.sum( -case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.FAILED, 1)], else_=0)), 0), -func.coalesce(func.sum( -case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), -func.count(TI.task_id), +successes, skipped, failed, upstream_failed, done = 0, 0, 0, 0, 0 +if dep_context.finished_tasks is None: +qry = ( +session +.query( +func.coalesce(func.sum( +case([(TI.state == State.SUCCESS, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.SKIPPED, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.FAILED, 1)], else_=0)), 0), +func.coalesce(func.sum( +case([(TI.state == State.UPSTREAM_FAILED, 1)], else_=0)), 0), +func.count(TI.task_id), Review comment: Yes, almost the same here, I will add UPSTREAM_FAILED and we will see what happen This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run
amichai07 commented on a change in pull request #4751: [AIRFLOW-3607] collected trigger rule dep check per dag run URL: https://github.com/apache/airflow/pull/4751#discussion_r261877174 ## File path: airflow/models/__init__.py ## @@ -4872,11 +4872,12 @@ def get_previous_scheduled_dagrun(self, session=None): ).first() @provide_session -def update_state(self, session=None): +def update_state(self, session=None, finished_tasks=None): """ Determines the overall state of the DagRun based on the state of its TaskInstances. +:param finished_tasks: The finished tasks collected ordered by dagrun as a column (task_name, state) Review comment: It is necessary for the dependency check of all unfinished tasks(4915), and we have it already so why not. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services