[AIRFLOW-989] Do not mark dag run successful if unfinished tasks

Dag runs could be marked successful if all root
tasks were successful,
even if some tasks did not run yet, ie. in case of
clearing. Now
we consider unfinished_tasks, before marking
successful.

Closes #2154 from bolkedebruin/AIRFLOW-989

(cherry picked from commit 3d6095ff5cf6eff0444d7e47a2360765f2953daf)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15600e42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15600e42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15600e42

Branch: refs/heads/v1-8-stable
Commit: 15600e42c805b222d6147b60376b56c8e708dcde
Parents: 3b37cfa
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Wed Mar 15 16:39:12 2017 -0700
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Mar 15 16:39:26 2017 -0700

----------------------------------------------------------------------
 airflow/models.py |  6 +++---
 tests/models.py   | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15600e42/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7c6590f..42b8a7f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4064,9 +4064,9 @@ class DagRun(Base):
                 logging.info('Marking run {} failed'.format(self))
                 self.state = State.FAILED
 
-            # if all roots succeeded, the run succeeded
-            elif all(r.state in (State.SUCCESS, State.SKIPPED)
-                     for r in roots):
+            # if all roots succeeded and no unfinished tasks, the run succeeded
+            elif not unfinished_tasks and all(r.state in (State.SUCCESS, 
State.SKIPPED)
+                                              for r in roots):
                 logging.info('Marking run {} successful'.format(self))
                 self.state = State.SUCCESS
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15600e42/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index ffd1f31..1fbb3e6 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -259,6 +259,57 @@ class DagRunTest(unittest.TestCase):
         updated_dag_state = dag_run.update_state()
         self.assertEqual(State.SUCCESS, updated_dag_state)
 
+    def test_dagrun_success_conditions(self):
+        session = settings.Session()
+
+        dag = DAG(
+            'test_dagrun_success_conditions',
+            start_date=DEFAULT_DATE,
+            default_args={'owner': 'owner1'})
+
+        # A -> B
+        # A -> C -> D
+        # ordered: B, D, C, A or D, B, C, A or D, C, B, A
+        with dag:
+            op1 = DummyOperator(task_id='A')
+            op2 = DummyOperator(task_id='B')
+            op3 = DummyOperator(task_id='C')
+            op4 = DummyOperator(task_id='D')
+            op1.set_upstream([op2, op3])
+            op3.set_upstream(op4)
+
+        dag.clear()
+
+        now = datetime.datetime.now()
+        dr = dag.create_dagrun(run_id='test_dagrun_success_conditions',
+                               state=State.RUNNING,
+                               execution_date=now,
+                               start_date=now)
+
+        # op1 = root
+        ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+        ti_op1.set_state(state=State.SUCCESS, session=session)
+
+        ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+        ti_op3 = dr.get_task_instance(task_id=op3.task_id)
+        ti_op4 = dr.get_task_instance(task_id=op4.task_id)
+
+        # root is successful, but unfinished tasks
+        state = dr.update_state()
+        self.assertEqual(State.RUNNING, state)
+
+        # one has failed, but root is successful
+        ti_op2.set_state(state=State.FAILED, session=session)
+        ti_op3.set_state(state=State.SUCCESS, session=session)
+        ti_op4.set_state(state=State.SUCCESS, session=session)
+        state = dr.update_state()
+        self.assertEqual(State.SUCCESS, state)
+
+        # upstream dependency failed, root has not run
+        ti_op1.set_state(State.NONE, session)
+        state = dr.update_state()
+        self.assertEqual(State.FAILED, state)
+
 
 class DagBagTest(unittest.TestCase):
 

Reply via email to