Repository: incubator-airflow
Updated Branches:
  refs/heads/master b7def7f1f -> 901e8f2a9


[AIRFLOW-68] Align start_date with the schedule_interval

This particular issue arises because of an alignment issue between
start_date and schedule_interval. This can only happen with cron-based
schedule_intervals that describe absolute points in time (like “1am”) as
opposed to time deltas (like “every hour”)

In the past (and in the docs) we have simply said that users must make
sure the two params agree. But this is counter intuitive. As in these
cases, start_date is sort of like telling the scheduler to
“start paying attention” as opposed to “this is my first execution 
date”.

This patch changes the behavior of the scheduler. The next run date of
the dag will be treated as "start_date + interval" unless the start_date
is on the (previous) interval in which case the start_date will be the
next run date.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f69eec3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f69eec3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f69eec3b

Branch: refs/heads/master
Commit: f69eec3b44cbb0a0fb46a17baec195f7f3baf50e
Parents: 03ce4b9
Author: Bolke de Bruin <bo...@xs4all.nl>
Authored: Tue Jun 7 09:52:05 2016 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Tue Jun 7 10:52:09 2016 +0200

----------------------------------------------------------------------
 airflow/jobs.py   | 12 ++++++++++--
 airflow/models.py | 15 +++++++++++++++
 docs/faq.rst      | 15 ++++++++++-----
 tests/jobs.py     | 52 ++++++++++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 85 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 5aaab3b..3a2d97a 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -424,13 +424,21 @@ class SchedulerJob(BaseJob):
                 # First run
                 task_start_dates = [t.start_date for t in dag.tasks]
                 if task_start_dates:
-                    next_run_date = min(task_start_dates)
+                    next_run_date = 
dag.normalize_schedule(min(task_start_dates))
+                    self.logger.debug("Next run date based on tasks {}"
+                                      .format(next_run_date))
             else:
                 next_run_date = dag.following_schedule(last_scheduled_run)
 
             # don't ever schedule prior to the dag's start_date
             if dag.start_date:
-                next_run_date = dag.start_date if not next_run_date else 
max(next_run_date, dag.start_date)
+                next_run_date = (dag.start_date if not next_run_date
+                                 else max(next_run_date, dag.start_date))
+                if next_run_date == dag.start_date:
+                    next_run_date = dag.normalize_schedule(dag.start_date)
+
+                self.logger.debug("Dag start date: {}. Next run date: {}"
+                                  .format(dag.start_date, next_run_date))
 
             # this structure is necessary to avoid a TypeError from 
concatenating
             # NoneType

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 08d0890..b6b7987 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2567,6 +2567,21 @@ class DAG(LoggingMixin):
         elif isinstance(self._schedule_interval, timedelta):
             return dttm - self._schedule_interval
 
+    def normalize_schedule(self, dttm):
+        """
+        Returns dttm + interval unless dttm is first interval then it returns 
dttm
+        """
+        following = self.following_schedule(dttm)
+
+        # in case of @once
+        if not following:
+            return dttm
+
+        if self.previous_schedule(following) != dttm:
+            return following
+
+        return dttm
+
     @property
     def tasks(self):
         return list(self.task_dict.values())

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/docs/faq.rst
----------------------------------------------------------------------
diff --git a/docs/faq.rst b/docs/faq.rst
index 21623fc..e61c1bf 100644
--- a/docs/faq.rst
+++ b/docs/faq.rst
@@ -78,12 +78,17 @@ We recommend against using dynamic values as 
``start_date``, especially
 once the period closes, and in theory an ``@hourly`` DAG would never get to
 an hour after now as ``now()`` moves along.
 
-We also recommend using rounded ``start_date`` in relation to your
-``schedule_interval``. This means an ``@hourly`` would be at ``00:00``
+
+Previously we also recommended using rounded ``start_date`` in relation to your
+``schedule_interval``. This meant an ``@hourly`` would be at ``00:00``
 minutes:seconds, a ``@daily`` job at midnight, a ``@monthly`` job on the
-first of the month. You can use any sensor or a ``TimeDeltaSensor`` to delay
-the execution of tasks within that period. While ``schedule_interval``
-does allow specifying a ``datetime.timedelta``
+first of the month. This is no longer required. Airflow will not auto align
+the ``start_date`` and the ``schedule_interval``, by using the ``start_date``
+as the moment to start looking.
+
+You can use any sensor or a ``TimeDeltaSensor`` to delay
+the execution of tasks within the schedule interval.
+While ``schedule_interval`` does allow specifying a ``datetime.timedelta``
 object, we recommend using the macros or cron expressions instead, as
 it enforces this idea of rounded schedules.
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 4214e47..455716e 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -672,9 +672,7 @@ class SchedulerJobTest(unittest.TestCase):
 
         dr = scheduler.schedule_dag(dag)
         self.assertIsNotNone(dr)
-        print(dr.start_date)
         dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
-        print(dr.start_date)
         session.merge(dr)
         session.commit()
 
@@ -684,3 +682,53 @@ class SchedulerJobTest(unittest.TestCase):
         dr.refresh_from_db(session=session)
         self.assertEquals(dr.state, State.FAILED)
 
+    def test_scheduler_auto_align(self):
+        """
+        Test if the schedule_interval will be auto aligned with the start_date
+        such that if the start_date coincides with the schedule the first
+        execution_date will be start_date, otherwise it will be start_date +
+        interval.
+        """
+        dag = DAG(
+            dag_id='test_scheduler_auto_align_1',
+            start_date=datetime.datetime(2016, 1, 1, 10, 10, 0),
+            schedule_interval="4 5 * * *"
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+        self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 
4))
+
+        dag = DAG(
+            dag_id='test_scheduler_auto_align_2',
+            start_date=datetime.datetime(2016, 1, 1, 10, 10, 0),
+            schedule_interval="10 10 * * *"
+        )
+        dag_task1 = DummyOperator(
+            task_id='dummy',
+            dag=dag,
+            owner='airflow')
+
+        session = settings.Session()
+        orm_dag = DagModel(dag_id=dag.dag_id)
+        session.merge(orm_dag)
+        session.commit()
+
+        scheduler = SchedulerJob()
+        dag.clear()
+
+        dr = scheduler.schedule_dag(dag)
+        self.assertIsNotNone(dr)
+        self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 
10))

Reply via email to