Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8f6364058 -> e97a1fc75


AIRFLOW-168 Correct evaluation of @once schedule

If the schedule @once was used with a start_date two dagruns
would be created as next_run_date would be none and compared
against dag.start_date. This patch fixes that by returning
immediately if a dagrun has already occured with an @once
schedule.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/19fa9852
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/19fa9852
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/19fa9852

Branch: refs/heads/master
Commit: 19fa9852e8ad57036fff12dd28f5b4e67642bce7
Parents: afcd4fc
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Thu May 26 21:05:39 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Sat May 28 13:51:37 2016 +0200

----------------------------------------------------------------------
 airflow/jobs.py | 30 ++++++++++--------------------
 tests/jobs.py   | 17 +++++++++++++++++
 2 files changed, 27 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19fa9852/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 7a8eb33..71a7d30 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -407,28 +407,18 @@ class SchedulerJob(BaseJob):
                             # add % as a wildcard for the like query
                             DagRun.run_id.like(DagRun.ID_PREFIX+'%')))
             last_scheduled_run = qry.scalar()
+
+            # don't schedule @once again
+            if dag.schedule_interval == '@once' and last_scheduled_run:
+                return None
+
             next_run_date = None
-            if dag.schedule_interval == '@once' and not last_scheduled_run:
-                next_run_date = datetime.now()
-            elif not last_scheduled_run:
+            if not last_scheduled_run:
                 # First run
-                TI = models.TaskInstance
-                latest_run = (
-                    session.query(func.max(TI.execution_date))
-                    .filter_by(dag_id=dag.dag_id)
-                    .scalar()
-                )
-                if latest_run:
-                    # Migrating from previous version
-                    # make the past 5 runs active
-                    next_run_date = dag.date_range(latest_run, -5)[0]
-                else:
-                    task_start_dates = [t.start_date for t in dag.tasks]
-                    if task_start_dates:
-                        next_run_date = min(task_start_dates)
-                    else:
-                        next_run_date = None
-            elif dag.schedule_interval != '@once':
+                task_start_dates = [t.start_date for t in dag.tasks]
+                if task_start_dates:
+                    next_run_date = min(task_start_dates)
+            else:
                 next_run_date = dag.following_schedule(last_scheduled_run)
 
             # don't ever schedule prior to the dag's start_date

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/19fa9852/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 1d2c614..ff23460 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -398,3 +398,20 @@ class SchedulerJobTest(unittest.TestCase):
         session = settings.Session()
         self.assertEqual(
             len(session.query(TI).filter(TI.dag_id == dag_id).all()), 0)
+
+    def test_scheduler_dagrun_once(self):
+        """
+        Test if the scheduler does not create multiple dagruns
+        if a dag is scheduled with @once and a start_date
+        """
+        dag = DAG(
+            'test_scheduler_dagrun_once',
+            start_date=datetime.datetime(2015, 1, 1),
+            schedule_interval="@once")
+
+        scheduler = SchedulerJob()
+        dag.clear()
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNone(dr)

Reply via email to