This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 18fef9b  Fix mini scheduler not respecting wait_for_downstream (#18310)
18fef9b is described below

commit 18fef9bb13a2cbd8cc09484082648aec78610d52
Author: Ephraim Anierobi <splendidzig...@gmail.com>
AuthorDate: Fri Sep 17 15:07:18 2021 +0100

    Fix mini scheduler not respecting wait_for_downstream (#18310)
    
    When wait_for_downstream is set on a task, mini scheduler doesn't respect it
    and goes ahead to schedule unrunnable task instances.
    
    This PR fixes it by checking the dependency in mini scheduler
    
    Co-authored-by: Kaxil Naik <kaxiln...@gmail.com>
---
 airflow/jobs/local_task_job.py    | 12 ++++++++++++
 tests/jobs/test_local_task_job.py | 41 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 53 insertions(+)

diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py
index 6b53bd9..ed7c723 100644
--- a/airflow/jobs/local_task_job.py
+++ b/airflow/jobs/local_task_job.py
@@ -226,6 +226,18 @@ class LocalTaskJob(BaseJob):
     @Sentry.enrich_errors
     def _run_mini_scheduler_on_child_tasks(self, session=None) -> None:
         try:
+
+            if (
+                self.task_instance.task.wait_for_downstream
+                and self.task_instance.get_previous_ti()
+                and not self.task_instance.are_dependents_done()
+            ):
+                self.log.info(
+                    "No downstream tasks scheduled because task instance "
+                    "dependents have not completed yet and wait_for_downstream 
is true"
+                )
+                return
+
             # Re-select the row with a lock
             dag_run = with_row_locks(
                 session.query(DagRun).filter_by(
diff --git a/tests/jobs/test_local_task_job.py 
b/tests/jobs/test_local_task_job.py
index 75124db..487b98b 100644
--- a/tests/jobs/test_local_task_job.py
+++ b/tests/jobs/test_local_task_job.py
@@ -707,6 +707,47 @@ class TestLocalTaskJob:
             if scheduler_job.processor_agent:
                 scheduler_job.processor_agent.end()
 
+    @conf_vars({('scheduler', 'schedule_after_task_execution'): 'True'})
+    def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, 
dag_maker):
+        session = settings.Session()
+        with dag_maker(default_args={'wait_for_downstream': True}, 
catchup=False) as dag:
+            task_a = PythonOperator(task_id='A', python_callable=lambda: True)
+            task_b = PythonOperator(task_id='B', python_callable=lambda: True)
+            task_c = PythonOperator(task_id='C', python_callable=lambda: True)
+            task_a >> task_b >> task_c
+
+        scheduler_job = SchedulerJob(subdir=os.devnull)
+        scheduler_job.dagbag.bag_dag(dag, root_dag=dag)
+
+        dr = dag.create_dagrun(run_id='test_1', state=State.RUNNING, 
execution_date=DEFAULT_DATE)
+        dr2 = dag.create_dagrun(
+            run_id='test_2', state=State.RUNNING, execution_date=DEFAULT_DATE 
+ datetime.timedelta(hours=1)
+        )
+        ti_a = TaskInstance(task_a, run_id=dr.run_id, state=State.SUCCESS)
+        ti_b = TaskInstance(task_b, run_id=dr.run_id, state=State.SUCCESS)
+        ti_c = TaskInstance(task_c, run_id=dr.run_id, state=State.RUNNING)
+        ti2_a = TaskInstance(task_a, run_id=dr2.run_id, state=State.NONE)
+        ti2_b = TaskInstance(task_b, run_id=dr2.run_id, state=State.NONE)
+        ti2_c = TaskInstance(task_c, run_id=dr2.run_id, state=State.NONE)
+        session.merge(ti_a)
+        session.merge(ti_b)
+        session.merge(ti_c)
+        session.merge(ti2_a)
+        session.merge(ti2_b)
+        session.merge(ti2_c)
+        session.flush()
+
+        job1 = LocalTaskJob(task_instance=ti2_a, ignore_ti_state=True, 
executor=SequentialExecutor())
+        job1.task_runner = StandardTaskRunner(job1)
+        job1.run()
+        ti2_a.refresh_from_db()
+        assert ti2_a.state == State.SUCCESS
+        assert ti2_b.state == State.NONE
+        assert (
+            "No downstream tasks scheduled because task instance "
+            "dependents have not completed yet and wait_for_downstream is true"
+        ) in caplog.text
+
     @patch('airflow.utils.process_utils.subprocess.check_call')
     def test_task_sigkill_works_with_retries(self, _check_call, caplog, 
dag_maker):
         """

Reply via email to