[jira] [Commented] (AIRFLOW-2951) dag_run end_date Null after a dag is finished
[ https://issues.apache.org/jira/browse/AIRFLOW-2951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16636958#comment-16636958 ] ASF GitHub Bot commented on AIRFLOW-2951: - ashb closed pull request #3990: [AIRFLOW-2951] Update dag_run table end_date when state change URL: https://github.com/apache/incubator-airflow/pull/3990 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/models.py b/airflow/models.py index 8fc259d1b5..428923ff9e 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4845,6 +4845,8 @@ def get_state(self): def set_state(self, state): if self._state != state: self._state = state +self.end_date = timezone.utcnow() if self._state in State.finished() else None + if self.dag_id is not None: # FIXME: Due to the scoped_session factor we we don't get a clean # session here, so something really weird goes on: @@ -5068,7 +5070,7 @@ def update_state(self, session=None): if (not unfinished_tasks and any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): self.log.info('Marking run %s failed', self) -self.state = State.FAILED +self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='task_failure', session=session) @@ -5076,20 +5078,20 @@ def update_state(self, session=None): elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) for r in roots): self.log.info('Marking run %s successful', self) -self.state = State.SUCCESS +self.set_state(State.SUCCESS) dag.handle_callback(self, success=True, reason='success', session=session) # if *all tasks* are deadlocked, the run failed elif (unfinished_tasks and none_depends_on_past and none_task_concurrency and no_dependencies_met): self.log.info('Deadlock; marking run %s failed', self) -self.state = State.FAILED +self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session) # finally, if the roots aren't done, the dag is still running else: -self.state = State.RUNNING +self.set_state(State.RUNNING) # todo: determine we want to use with_for_update to make sure to lock the run session.merge(self) diff --git a/tests/models.py b/tests/models.py index 60aee3c84f..55fa41bd90 100644 --- a/tests/models.py +++ b/tests/models.py @@ -915,6 +915,124 @@ def on_failure_callable(context): updated_dag_state = dag_run.update_state() self.assertEqual(State.FAILED, updated_dag_state) +def test_dagrun_set_state_end_date(self): +session = settings.Session() + +dag = DAG( +'test_dagrun_set_state_end_date', +start_date=DEFAULT_DATE, +default_args={'owner': 'owner1'}) + +dag.clear() + +now = timezone.utcnow() +dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date', + state=State.RUNNING, + execution_date=now, + start_date=now) + +# Initial end_date should be NULL +# State.SUCCESS and State.FAILED are all ending state and should set end_date +# State.RUNNING set end_date back to NULL +session.add(dr) +session.commit() +self.assertIsNone(dr.end_date) + +dr.set_state(State.SUCCESS) +session.merge(dr) +session.commit() + +dr_database = session.query(DagRun).filter( +DagRun.run_id == 'test_dagrun_set_state_end_date' +).one() +self.assertIsNotNone(dr_database.end_date) +self.assertEqual(dr.end_date, dr_database.end_date) + +dr.set_state(State.RUNNING) +session.merge(dr) +session.commit() + +dr_database = session.query(DagRun).filter( +DagRun.run_id == 'test_dagrun_set_state_end_date' +).one() + +self.assertIsNone(dr_database.end_date) + +dr.set_state(State.FAILED) +session.merge(dr) +session.commit() +dr_database = session.query(DagRun).filter( +DagRun.run_id == 'test_dagrun_set_state_end_date' +).one() + +self.assertIsNotNone(dr_database.end_date) +
[jira] [Commented] (AIRFLOW-2951) dag_run end_date Null after a dag is finished
[ https://issues.apache.org/jira/browse/AIRFLOW-2951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16636379#comment-16636379 ] ASF GitHub Bot commented on AIRFLOW-2951: - YingboWang opened a new pull request #3990: [AIRFLOW-2951] Update dag_run table end_date when state change URL: https://github.com/apache/incubator-airflow/pull/3990 The existing airflow only change dag_run table end_date value when a user teminate a dag in web UI. The end_date will not be updated if airflow detected a dag finished and updated its state. This commit add end_date update in DagRun's set_state function to make up tho problem mentioned above. Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow-2951](https://issues.apache.org/jira/browse/AIRFLOW-2951) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-2951 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: Same feature was merged in master before. This PR is to backport to v1-10-test ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: tests/models.py:DagRunTest.test_dagrun_set_state_end_date ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [ ] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` @ashb @KevinYang21 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > dag_run end_date Null after a dag is finished > - > > Key: AIRFLOW-2951 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2951 > Project: Apache Airflow > Issue Type: Improvement > Components: DagRun >Reporter: Yingbo Wang >Assignee: Yingbo Wang >Priority: Major > Fix For: 1.10.1 > > > dag_run table should have an end_date updated when a dag is finished. > Currently only user activated dag termination request coming from UI may > change the "end_date" in dag_run table. All scheduled dags that are > automatically running by airflow will leave a NULL value after they fall into > a "success" or "failed" state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2951) dag_run end_date Null after a dag is finished
[ https://issues.apache.org/jira/browse/AIRFLOW-2951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599403#comment-16599403 ] ASF GitHub Bot commented on AIRFLOW-2951: - kaxil closed pull request #3798: [AIRFLOW-2951] Update dag_run table end_date when state change URL: https://github.com/apache/incubator-airflow/pull/3798 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/api/common/experimental/mark_tasks.py b/airflow/api/common/experimental/mark_tasks.py index 88c5275f5a..2fac1254cd 100644 --- a/airflow/api/common/experimental/mark_tasks.py +++ b/airflow/api/common/experimental/mark_tasks.py @@ -208,6 +208,7 @@ def _set_dag_run_state(dag_id, execution_date, state, session=None): dr.state = state if state == State.RUNNING: dr.start_date = timezone.utcnow() +dr.end_date = None else: dr.end_date = timezone.utcnow() session.commit() diff --git a/airflow/models.py b/airflow/models.py index 55badf4828..6c8031c18c 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4840,6 +4840,8 @@ def get_state(self): def set_state(self, state): if self._state != state: self._state = state +self.end_date = timezone.utcnow() if self._state in State.finished() else None + if self.dag_id is not None: # FIXME: Due to the scoped_session factor we we don't get a clean # session here, so something really weird goes on: @@ -5063,7 +5065,7 @@ def update_state(self, session=None): if (not unfinished_tasks and any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): self.log.info('Marking run %s failed', self) -self.state = State.FAILED +self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='task_failure', session=session) @@ -5071,20 +5073,20 @@ def update_state(self, session=None): elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) for r in roots): self.log.info('Marking run %s successful', self) -self.state = State.SUCCESS +self.set_state(State.SUCCESS) dag.handle_callback(self, success=True, reason='success', session=session) # if *all tasks* are deadlocked, the run failed elif (unfinished_tasks and none_depends_on_past and none_task_concurrency and no_dependencies_met): self.log.info('Deadlock; marking run %s failed', self) -self.state = State.FAILED +self.set_state(State.FAILED) dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', session=session) # finally, if the roots aren't done, the dag is still running else: -self.state = State.RUNNING +self.set_state(State.RUNNING) # todo: determine we want to use with_for_update to make sure to lock the run session.merge(self) diff --git a/tests/models.py b/tests/models.py index a1fd1e9912..7adeb3acdd 100644 --- a/tests/models.py +++ b/tests/models.py @@ -896,6 +896,124 @@ def on_failure_callable(context): updated_dag_state = dag_run.update_state() self.assertEqual(State.FAILED, updated_dag_state) +def test_dagrun_set_state_end_date(self): +session = settings.Session() + +dag = DAG( +'test_dagrun_set_state_end_date', +start_date=DEFAULT_DATE, +default_args={'owner': 'owner1'}) + +dag.clear() + +now = timezone.utcnow() +dr = dag.create_dagrun(run_id='test_dagrun_set_state_end_date', + state=State.RUNNING, + execution_date=now, + start_date=now) + +# Initial end_date should be NULL +# State.SUCCESS and State.FAILED are all ending state and should set end_date +# State.RUNNING set end_date back to NULL +session.add(dr) +session.commit() +self.assertIsNone(dr.end_date) + +dr.set_state(State.SUCCESS) +session.merge(dr) +session.commit() + +dr_database = session.query(DagRun).filter( +DagRun.run_id == 'test_dagrun_set_state_end_date' +).one() +self.assertIsNotNone(dr_database.end_date) +self.assertEqual(dr.end_date, dr_database.end_date) + +
[jira] [Commented] (AIRFLOW-2951) dag_run end_date Null after a dag is finished
[ https://issues.apache.org/jira/browse/AIRFLOW-2951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16590912#comment-16590912 ] ASF GitHub Bot commented on AIRFLOW-2951: - YingboWang opened a new pull request #3798: [AIRFLOW-2951] Update dag_run table end_date when state change URL: https://github.com/apache/incubator-airflow/pull/3798 The existing airflow only change dag_run table end_date value when a user teminate a dag in web UI. The end_date will not be updated if airflow detected a dag finished and updated its state. This commit add end_date update in DagRun's set_state function to make up tho problem mentioned above. Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOWAIRFLOW-2951/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-2951 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [x] Any finished dag should have a valid `end_date` in `dag_run` table, however this feature was not fully implemented. The `end_date` only exists when user try to mark a dag as success or failed from UI. This PR change the `DagRun.set_state()` function to set `end_date` when a `DagRun` state is changed and redirect existing dag_run state assignment to `set_state` function. It also fix UI to make sure a cleared dag will reset `end_date` to NULL. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: tests/models.py:DagRunTest.test_dagrun_set_state_end_date ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Code Quality - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` @yrqls21 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > dag_run end_date Null after a dag is finished > - > > Key: AIRFLOW-2951 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2951 > Project: Apache Airflow > Issue Type: Improvement > Components: DagRun >Reporter: Yingbo Wang >Assignee: Yingbo Wang >Priority: Major > > dag_run table should have an end_date updated when a dag is finished. > Currently only user activated dag termination request coming from UI may > change the "end_date" in dag_run table. All scheduled dags that are > automatically running by airflow will leave a NULL value after they fall into > a "success" or "failed" state. -- This message was sent by Atlassian JIRA (v7.6.3#76005)