[jira] [Commented] (AIRFLOW-2756) Marking DAG run does not set start_time and end_time correctly

2018-08-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564958#comment-16564958
 ] 

ASF GitHub Bot commented on AIRFLOW-2756:
-

kaxil closed pull request #3606: [AIRFLOW-2756] Fix bug in set DAG run state 
workflow
URL: https://github.com/apache/incubator-airflow/pull/3606
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/api/common/experimental/mark_tasks.py 
b/airflow/api/common/experimental/mark_tasks.py
index 681864dfbe..88c5275f5a 100644
--- a/airflow/api/common/experimental/mark_tasks.py
+++ b/airflow/api/common/experimental/mark_tasks.py
@@ -206,7 +206,10 @@ def _set_dag_run_state(dag_id, execution_date, state, 
session=None):
 DR.execution_date == execution_date
 ).one()
 dr.state = state
-dr.end_date = timezone.utcnow()
+if state == State.RUNNING:
+dr.start_date = timezone.utcnow()
+else:
+dr.end_date = timezone.utcnow()
 session.commit()
 
 
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 00ede5451d..70891ab4c3 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1023,8 +1023,7 @@ def _change_state_for_tis_without_dagrun(self,
 models.TaskInstance.dag_id == subq.c.dag_id,
 models.TaskInstance.task_id == subq.c.task_id,
 models.TaskInstance.execution_date ==
-subq.c.execution_date,
-models.TaskInstance.task_id == subq.c.task_id)) \
+subq.c.execution_date)) \
 .update({models.TaskInstance.state: new_state},
 synchronize_session=False)
 session.commit()
diff --git a/airflow/www/views.py b/airflow/www/views.py
index d37c0db45d..1ee5a2df86 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2741,7 +2741,8 @@ def after_model_change(self, form, dagrun, is_created, 
session=None):
 altered_tis = set_dag_run_state_to_success(
 dagbag.get_dag(dagrun.dag_id),
 dagrun.execution_date,
-commit=True)
+commit=True,
+session=session)
 elif dagrun.state == State.FAILED:
 altered_tis = set_dag_run_state_to_failed(
 dagbag.get_dag(dagrun.dag_id),
diff --git a/tests/api/common/experimental/mark_tasks.py 
b/tests/api/common/experimental/mark_tasks.py
index 181d10d8a1..9bba91bee0 100644
--- a/tests/api/common/experimental/mark_tasks.py
+++ b/tests/api/common/experimental/mark_tasks.py
@@ -267,11 +267,25 @@ def _create_test_dag_run(self, state, date):
 def _verify_dag_run_state(self, dag, date, state):
 drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
 dr = drs[0]
+
 self.assertEqual(dr.get_state(), state)
 
+def _verify_dag_run_dates(self, dag, date, state, middle_time):
+# When target state is RUNNING, we should set start_date,
+# otherwise we should set end_date.
+drs = models.DagRun.find(dag_id=dag.dag_id, execution_date=date)
+dr = drs[0]
+if state == State.RUNNING:
+self.assertGreater(dr.start_date, middle_time)
+self.assertIsNone(dr.end_date)
+else:
+self.assertLess(dr.start_date, middle_time)
+self.assertGreater(dr.end_date, middle_time)
+
 def test_set_running_dag_run_to_success(self):
 date = self.execution_dates[0]
 dr = self._create_test_dag_run(State.RUNNING, date)
+middle_time = timezone.utcnow()
 self._set_default_task_instance_states(dr)
 
 altered = set_dag_run_state_to_success(self.dag1, date, commit=True)
@@ -280,10 +294,12 @@ def test_set_running_dag_run_to_success(self):
 self.assertEqual(len(altered), 5)
 self._verify_dag_run_state(self.dag1, date, State.SUCCESS)
 self._verify_task_instance_states(self.dag1, date, State.SUCCESS)
+self._verify_dag_run_dates(self.dag1, date, State.SUCCESS, middle_time)
 
 def test_set_running_dag_run_to_failed(self):
 date = self.execution_dates[0]
 dr = self._create_test_dag_run(State.RUNNING, date)
+middle_time = timezone.utcnow()
 self._set_default_task_instance_states(dr)
 
 altered = set_dag_run_state_to_failed(self.dag1, date, commit=True)
@@ -292,10 +308,12 @@ def test_set_running_dag_run_to_failed(self):
 self.assertEqual(len(altered), 1)
 self._verify_dag_run_state(self.dag1, date, State.FAILED)
 self.assertEqual(dr.get_task_instance('run_after_loop').state, 
State.FAILED)
+self._verify_dag_run_dates(self.dag1, date, State.FAILED, middle_time)
 
  

[jira] [Commented] (AIRFLOW-2756) Marking DAG run does not set start_time and end_time correctly

2018-08-01 Thread ASF subversion and git services (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16564959#comment-16564959
 ] 

ASF subversion and git services commented on AIRFLOW-2756:
--

Commit c26c56487649f8d912c20ebf7aec1677e9996ea4 in incubator-airflow's branch 
refs/heads/master from yrqls21
[ https://gitbox.apache.org/repos/asf?p=incubator-airflow.git;h=c26c564 ]

[AIRFLOW-2756] Fix bug in set DAG run state workflow (#3606)



> Marking DAG run does not set start_time and end_time correctly
> --
>
> Key: AIRFLOW-2756
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2756
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Kevin Yang
>Assignee: Kevin Yang
>Priority: Major
>
> Marking DAG run right now always set end_time while it should set start_time 
> when marking RUNNING and otherwise end_time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)