[
https://issues.apache.org/jira/browse/AIRFLOW-2756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564958#comment-16564958
]
ASF GitHub Bot commented on AIRFLOW-2756:
-
kaxil closed pull request #3606: [AIRFLOW-2756] Fix bug in set DAG run state
workflow
URL: https://github.com/apache/incubator-airflow/pull/3606
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/api/common/experimental/mark_tasks.py
b/airflow/api/common/experimental/mark_tasks.py
index 681864dfbe..88c5275f5a 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -206,7 +206,10 @@ def _set_dag_run_state(dag_id, execution_date, state,
session=None):
DR.execution_date == execution_date
).one()
dr.state = state
-dr.end_date = timezone.utcnow()
+if state == State.RUNNING:
+dr.start_date = timezone.utcnow()
+else:
+dr.end_date = timezone.utcnow()
session.commit()
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 00ede5451d..70891ab4c3 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1023,8 +1023,7 @@ def _change_state_for_tis_without_dagrun(self,
models.TaskInstance.dag_id == subq.c.dag_id,
models.TaskInstance.task_id == subq.c.task_id,
models.TaskInstance.execution_date ==
-subq.c.execution_date,
-models.TaskInstance.task_id == subq.c.task_id)) \
+subq.c.execution_date)) \
.update({models.TaskInstance.state: new_state},
synchronize_session=False)
session.commit()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d37c0db45d..1ee5a2df86 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2741,7 +2741,8 @@ def after_model_change(self, form, dagrun, is_created,
session=None):
altered_tis = set_dag_run_state_to_success(
dagbag.get_dag(dagrun.dag_id),
dagrun.execution_date,
-commit=True)
+commit=True,
+session=session)
elif dagrun.state == State.FAILED:
altered_tis = set_dag_run_state_to_failed(
dagbag.get_dag(dagrun.dag_id),
diff --git a/tests/api/common/experimental/mark_tasks.py
b/tests/api/common/experimental/mark_tasks.py
index 181d10d8a1..9bba91bee0 100644
--- a/tests/api/common/experimental/mark_tasks.py
+++ b/tests/api/common/experimental/mark_tasks.py
@@ -267,11 +267,25 @@ def _create_test_dag_run(self, state, date):
def _verify_dag_run_state(self, dag, date, state):
drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
dr = drs[0]
+
self.assertEqual(dr.get_state(), state)
+def _verify_dag_run_dates(self, dag, date, state, middle_time):
+# When target state is RUNNING, we should set start_date,
+# otherwise we should set end_date.
+drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
+dr = drs[0]
+if state == State.RUNNING:
+self.assertGreater(dr.start_date, middle_time)
+self.assertIsNone(dr.end_date)
+else:
+self.assertLess(dr.start_date, middle_time)
+self.assertGreater(dr.end_date, middle_time)
+
def test_set_running_dag_run_to_success(self):
date = self.execution_dates[0]
dr = self._create_test_dag_run(State.RUNNING, date)
+middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)
altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
@@ -280,10 +294,12 @@ def test_set_running_dag_run_to_success(self):
self.assertEqual(len(altered), 5)
self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
+self._verify_dag_run_dates(self.dag1, date, State.SUCCESS, middle_time)
def test_set_running_dag_run_to_failed(self):
date = self.execution_dates[0]
dr = self._create_test_dag_run(State.RUNNING, date)
+middle_time = timezone.utcnow()
self._set_default_task_instance_states(dr)
altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
@@ -292,10 +308,12 @@ def test_set_running_dag_run_to_failed(self):
self.assertEqual(len(altered), 1)
self._verify_dag_run_state(self.dag1, date, State.FAILED)
self.assertEqual(dr.get_task_instance('run_after_loop').state,
State.FAILED)
+self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)