Repository: incubator-airflow Updated Branches: refs/heads/master 8f6364058 -> e97a1fc75
AIRFLOW-168 Correct evaluation of @once schedule If the schedule @once was used with a start_date two dagruns would be created as next_run_date would be none and compared against dag.start_date. This patch fixes that by returning immediately if a dagrun has already occured with an @once schedule. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/19fa9852 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/19fa9852 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/19fa9852 Branch: refs/heads/master Commit: 19fa9852e8ad57036fff12dd28f5b4e67642bce7 Parents: afcd4fc Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Thu May 26 21:05:39 2016 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Sat May 28 13:51:37 2016 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 30 ++++++++++-------------------- tests/jobs.py | 17 +++++++++++++++++ 2 files changed, 27 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19fa9852/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 7a8eb33..71a7d30 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -407,28 +407,18 @@ class SchedulerJob(BaseJob): # add % as a wildcard for the like query DagRun.run_id.like(DagRun.ID_PREFIX+'%'))) last_scheduled_run = qry.scalar() + + # don't schedule @once again + if dag.schedule_interval == '@once' and last_scheduled_run: + return None + next_run_date = None - if dag.schedule_interval == '@once' and not last_scheduled_run: - next_run_date = datetime.now() - elif not last_scheduled_run: + if not last_scheduled_run: # First run - TI = models.TaskInstance - latest_run = ( - session.query(func.max(TI.execution_date)) - .filter_by(dag_id=dag.dag_id) - .scalar() - ) - if latest_run: - # Migrating from previous version - # make the past 5 runs active - next_run_date = dag.date_range(latest_run, -5)[0] - else: - task_start_dates = [t.start_date for t in dag.tasks] - if task_start_dates: - next_run_date = min(task_start_dates) - else: - next_run_date = None - elif dag.schedule_interval != '@once': + task_start_dates = [t.start_date for t in dag.tasks] + if task_start_dates: + next_run_date = min(task_start_dates) + else: next_run_date = dag.following_schedule(last_scheduled_run) # don't ever schedule prior to the dag's start_date http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19fa9852/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 1d2c614..ff23460 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -398,3 +398,20 @@ class SchedulerJobTest(unittest.TestCase): session = settings.Session() self.assertEqual( len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0) + + def test_scheduler_dagrun_once(self): + """ + Test if the scheduler does not create multiple dagruns + if a dag is scheduled with @once and a start_date + """ + dag = DAG( + 'test_scheduler_dagrun_once', + start_date=datetime.datetime(2015, 1, 1), + schedule_interval="@once") + + scheduler = SchedulerJob() + dag.clear() + dr = scheduler.schedule_dag(dag) + self.assertIsNotNone(dr) + dr = scheduler.schedule_dag(dag) + self.assertIsNone(dr)