Repository: incubator-airflow Updated Branches: refs/heads/master b7def7f1f -> 901e8f2a9
[AIRFLOW-68] Align start_date with the schedule_interval This particular issue arises because of an alignment issue between start_date and schedule_interval. This can only happen with cron-based schedule_intervals that describe absolute points in time (like â1amâ) as opposed to time deltas (like âevery hourâ) In the past (and in the docs) we have simply said that users must make sure the two params agree. But this is counter intuitive. As in these cases, start_date is sort of like telling the scheduler to âstart paying attentionâ as opposed to âthis is my first execution dateâ. This patch changes the behavior of the scheduler. The next run date of the dag will be treated as "start_date + interval" unless the start_date is on the (previous) interval in which case the start_date will be the next run date. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f69eec3b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f69eec3b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f69eec3b Branch: refs/heads/master Commit: f69eec3b44cbb0a0fb46a17baec195f7f3baf50e Parents: 03ce4b9 Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Tue Jun 7 09:52:05 2016 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Tue Jun 7 10:52:09 2016 +0200 ---------------------------------------------------------------------- airflow/jobs.py | 12 ++++++++++-- airflow/models.py | 15 +++++++++++++++ docs/faq.rst | 15 ++++++++++----- tests/jobs.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 85 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 5aaab3b..3a2d97a 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -424,13 +424,21 @@ class SchedulerJob(BaseJob): # First run task_start_dates = [t.start_date for t in dag.tasks] if task_start_dates: - next_run_date = min(task_start_dates) + next_run_date = dag.normalize_schedule(min(task_start_dates)) + self.logger.debug("Next run date based on tasks {}" + .format(next_run_date)) else: next_run_date = dag.following_schedule(last_scheduled_run) # don't ever schedule prior to the dag's start_date if dag.start_date: - next_run_date = dag.start_date if not next_run_date else max(next_run_date, dag.start_date) + next_run_date = (dag.start_date if not next_run_date + else max(next_run_date, dag.start_date)) + if next_run_date == dag.start_date: + next_run_date = dag.normalize_schedule(dag.start_date) + + self.logger.debug("Dag start date: {}. Next run date: {}" + .format(dag.start_date, next_run_date)) # this structure is necessary to avoid a TypeError from concatenating # NoneType http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 08d0890..b6b7987 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -2567,6 +2567,21 @@ class DAG(LoggingMixin): elif isinstance(self._schedule_interval, timedelta): return dttm - self._schedule_interval + def normalize_schedule(self, dttm): + """ + Returns dttm + interval unless dttm is first interval then it returns dttm + """ + following = self.following_schedule(dttm) + + # in case of @once + if not following: + return dttm + + if self.previous_schedule(following) != dttm: + return following + + return dttm + @property def tasks(self): return list(self.task_dict.values()) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/docs/faq.rst ---------------------------------------------------------------------- diff --git a/docs/faq.rst b/docs/faq.rst index 21623fc..e61c1bf 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -78,12 +78,17 @@ We recommend against using dynamic values as ``start_date``, especially once the period closes, and in theory an ``@hourly`` DAG would never get to an hour after now as ``now()`` moves along. -We also recommend using rounded ``start_date`` in relation to your -``schedule_interval``. This means an ``@hourly`` would be at ``00:00`` + +Previously we also recommended using rounded ``start_date`` in relation to your +``schedule_interval``. This meant an ``@hourly`` would be at ``00:00`` minutes:seconds, a ``@daily`` job at midnight, a ``@monthly`` job on the -first of the month. You can use any sensor or a ``TimeDeltaSensor`` to delay -the execution of tasks within that period. While ``schedule_interval`` -does allow specifying a ``datetime.timedelta`` +first of the month. This is no longer required. Airflow will not auto align +the ``start_date`` and the ``schedule_interval``, by using the ``start_date`` +as the moment to start looking. + +You can use any sensor or a ``TimeDeltaSensor`` to delay +the execution of tasks within the schedule interval. +While ``schedule_interval`` does allow specifying a ``datetime.timedelta`` object, we recommend using the macros or cron expressions instead, as it enforces this idea of rounded schedules. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f69eec3b/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 4214e47..455716e 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -672,9 +672,7 @@ class SchedulerJobTest(unittest.TestCase): dr = scheduler.schedule_dag(dag) self.assertIsNotNone(dr) - print(dr.start_date) dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1) - print(dr.start_date) session.merge(dr) session.commit() @@ -684,3 +682,53 @@ class SchedulerJobTest(unittest.TestCase): dr.refresh_from_db(session=session) self.assertEquals(dr.state, State.FAILED) + def test_scheduler_auto_align(self): + """ + Test if the schedule_interval will be auto aligned with the start_date + such that if the start_date coincides with the schedule the first + execution_date will be start_date, otherwise it will be start_date + + interval. + """ + dag = DAG( + dag_id='test_scheduler_auto_align_1', + start_date=datetime.datetime(2016, 1, 1, 10, 10, 0), + schedule_interval="4 5 * * *" + ) + dag_task1 = DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow') + + session = settings.Session() + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + session.commit() + + scheduler = SchedulerJob() + dag.clear() + + dr = scheduler.schedule_dag(dag) + self.assertIsNotNone(dr) + self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 4)) + + dag = DAG( + dag_id='test_scheduler_auto_align_2', + start_date=datetime.datetime(2016, 1, 1, 10, 10, 0), + schedule_interval="10 10 * * *" + ) + dag_task1 = DummyOperator( + task_id='dummy', + dag=dag, + owner='airflow') + + session = settings.Session() + orm_dag = DagModel(dag_id=dag.dag_id) + session.merge(orm_dag) + session.commit() + + scheduler = SchedulerJob() + dag.clear() + + dr = scheduler.schedule_dag(dag) + self.assertIsNotNone(dr) + self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10))