Repository: incubator-airflow
Updated Branches:
  refs/heads/master 651e6063d -> ddc502694


[AIRFLOW-1349] Fix backfill to respect limits

Before, if a backfill job was triggered that would
include a dag run
already in a RUNNING state, the dag run within the
backfill would be
included in the count agains the max_active_runs
limit. Also, if a
backfill job generated multiple dag runs it could
potentially
violate max_active_runs limits by executing all
dag runs.

Now the limit is checked per dag run to be
created, and the backfill job
will only run the dag runs within the backfill job
that could be
included within the limits.
Also, if the max_active_runs limit has already
been reached, the
BackfillJob will wait and loop trying to create
the required dag runs as
soon as a dag run slot within the limit is
available until all dag runs
are completed.

These changes provide a more consistent behavior
according to the
max_active_runs limits definition and allows the
user to run backfill
jobs with existing RUNNING state when already
considered within the
limits.

Closes #2454 from edgarRd/erod-fix-backfill-max


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ddc50269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ddc50269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ddc50269

Branch: refs/heads/master
Commit: ddc50269431d8715e0eaeed7be5f522fed5521da
Parents: 651e606
Author: Edgar Rodriguez <edgar.rodrig...@airbnb.com>
Authored: Fri Aug 4 14:08:17 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Aug 4 14:08:20 2017 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py |  13 ++++-
 airflow/jobs.py    | 144 +++++++++++++++++++++++++++++-------------------
 airflow/models.py  |  51 ++++++++++++++++-
 tests/jobs.py      | 140 +++++++++++++++++++++++++++++++++++++++++-----
 4 files changed, 271 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index dc49bb7..077cb90 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -179,7 +179,8 @@ def backfill(args, dag=None):
                           conf.getboolean('core', 'donot_pickle')),
             ignore_first_depends_on_past=args.ignore_first_depends_on_past,
             ignore_task_deps=args.ignore_dependencies,
-            pool=args.pool)
+            pool=args.pool,
+            delay_on_limit_secs=args.delay_on_limit)
 
 
 def trigger_dag(args):
@@ -1234,6 +1235,14 @@ class CLIFactory(object):
                 "DO respect depends_on_past)."),
             "store_true"),
         'pool': Arg(("--pool",), "Resource pool to use"),
+        'delay_on_limit': Arg(
+            ("--delay_on_limit",),
+            help=("Amount of time in seconds to wait when the limit "
+                  "on maximum active dag runs (max_active_runs) has "
+                  "been reached before trying to execute a dag run "
+                  "again."),
+            type=float,
+            default=1.0),
         # list_tasks
         'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
         # list_dags
@@ -1491,7 +1500,7 @@ class CLIFactory(object):
                 'dag_id', 'task_regex', 'start_date', 'end_date',
                 'mark_success', 'local', 'donot_pickle', 'include_adhoc',
                 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
-                'subdir', 'pool', 'dry_run')
+                'subdir', 'pool', 'delay_on_limit', 'dry_run')
         }, {
             'func': list_tasks,
             'help': "List the tasks within a DAG",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 668973e..d94a0e0 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1837,26 +1837,29 @@ class BackfillJob(BaseJob):
                      not_ready=None,
                      deadlocked=None,
                      active_runs=None,
+                     executed_dag_run_dates=None,
                      finished_runs=0,
                      total_runs=0,
                      ):
             """
             :param to_run: Tasks to run in the backfill
-            :type to_run: dict
+            :type to_run: dict[Tuple[String, String, DateTime], TaskInstance]
             :param started: Maps started task instance key to task instance 
object
-            :type started: dict
+            :type started: dict[Tuple[String, String, DateTime], TaskInstance]
             :param skipped: Tasks that have been skipped
-            :type skipped: set
+            :type skipped: set[Tuple[String, String, DateTime]]
             :param succeeded: Tasks that have succeeded so far
-            :type succeeded: set
+            :type succeeded: set[Tuple[String, String, DateTime]]
             :param failed: Tasks that have failed
-            :type failed: set
+            :type failed: set[Tuple[String, String, DateTime]]
             :param not_ready: Tasks not ready for execution
-            :type not_ready: set
+            :type not_ready: set[Tuple[String, String, DateTime]]
             :param deadlocked: Deadlocked tasks
-            :type deadlocked: set
-            :param active_runs: Active tasks at a certain point in time
-            :type active_runs: list
+            :type deadlocked: set[Tuple[String, String, DateTime]]
+            :param active_runs: Active dag runs at a certain point in time
+            :type active_runs: list[DagRun]
+            :param executed_dag_run_dates: Datetime objects for the executed 
dag runs
+            :type executed_dag_run_dates: set[Datetime]
             :param finished_runs: Number of finished runs so far
             :type finished_runs: int
             :param total_runs: Number of total dag runs able to run
@@ -1870,6 +1873,7 @@ class BackfillJob(BaseJob):
             self.not_ready = not_ready or set()
             self.deadlocked = deadlocked or set()
             self.active_runs = active_runs or list()
+            self.executed_dag_run_dates = executed_dag_run_dates or set()
             self.finished_runs = finished_runs
             self.total_runs = total_runs
 
@@ -1884,6 +1888,7 @@ class BackfillJob(BaseJob):
             ignore_first_depends_on_past=False,
             ignore_task_deps=False,
             pool=None,
+            delay_on_limit_secs=1.0,
             *args, **kwargs):
         self.dag = dag
         self.dag_id = dag.dag_id
@@ -1895,6 +1900,7 @@ class BackfillJob(BaseJob):
         self.ignore_first_depends_on_past = ignore_first_depends_on_past
         self.ignore_task_deps = ignore_task_deps
         self.pool = pool
+        self.delay_on_limit_secs = delay_on_limit_secs
         super(BackfillJob, self).__init__(*args, **kwargs)
 
     def _update_counters(self, ti_status):
@@ -1975,32 +1981,51 @@ class BackfillJob(BaseJob):
     def _get_dag_run(self, run_date, session=None):
         """
         Returns a dag run for the given run date, which will be matched to an 
existing
-        dag run if available or create a new dag run otherwise.
+        dag run if available or create a new dag run otherwise. If the 
max_active_runs
+        limit is reached, this function will return None.
         :param run_date: the execution date for the dag run
         :type run_date: datetime
         :param session: the database session object
         :type session: Session
-        :return: the dag run for the run date
+        :return: a DagRun in state RUNNING or None
         """
         run_id = BackfillJob.ID_FORMAT_PREFIX.format(run_date.isoformat())
 
+        # consider max_active_runs but ignore when running subdags
+        respect_dag_max_active_limit = (True
+                                        if (self.dag.schedule_interval and
+                                            not self.dag.is_subdag)
+                                        else False)
+
+        current_active_dag_count = 
self.dag.get_num_active_runs(external_trigger=False)
+
         # check if we are scheduling on top of a already existing dag_run
         # we could find a "scheduled" run instead of a "backfill"
         run = DagRun.find(dag_id=self.dag.dag_id,
                           execution_date=run_date,
                           session=session)
 
-        if run is None or len(run) == 0:
-            run = self.dag.create_dagrun(
-                run_id=run_id,
-                execution_date=run_date,
-                start_date=datetime.now(),
-                state=State.RUNNING,
-                external_trigger=False,
-                session=session
-            )
-        else:
+        if run is not None and len(run) > 0:
             run = run[0]
+            if run.state == State.RUNNING:
+                respect_dag_max_active_limit = False
+        else:
+            run = None
+
+        # enforce max_active_runs limit for dag, special cases already
+        # handled by respect_dag_max_active_limit
+        if (respect_dag_max_active_limit and
+                current_active_dag_count >= self.dag.max_active_runs):
+            return None
+
+        run = run or self.dag.create_dagrun(
+            run_id=run_id,
+            execution_date=run_date,
+            start_date=datetime.now(),
+            state=State.RUNNING,
+            external_trigger=False,
+            session=session
+        )
 
         # set required transient field
         run.dag = self.dag
@@ -2308,23 +2333,25 @@ class BackfillJob(BaseJob):
         :type start_date: datetime
         :param session: the current session object
         :type session: Session
-        :return: list of execution dates of the dag runs that were executed.
-        :rtype: list
         """
         for next_run_date in run_dates:
             dag_run = self._get_dag_run(next_run_date, session=session)
             tis_map = self._task_instances_for_dag_run(dag_run,
                                                        session=session)
+            if dag_run is None:
+                continue
+
             ti_status.active_runs.append(dag_run)
             ti_status.to_run.update(tis_map or {})
 
-        ti_status.total_runs = len(ti_status.active_runs)
+        processed_dag_run_dates = self._process_backfill_task_instances(
+            ti_status=ti_status,
+            executor=executor,
+            pickle_id=pickle_id,
+            start_date=start_date,
+            session=session)
 
-        return self._process_backfill_task_instances(ti_status=ti_status,
-                                                     executor=executor,
-                                                     pickle_id=pickle_id,
-                                                     start_date=start_date,
-                                                     session=session)
+        ti_status.executed_dag_run_dates.update(processed_dag_run_dates)
 
     def _execute(self):
         """
@@ -2334,22 +2361,6 @@ class BackfillJob(BaseJob):
         session = settings.Session()
         ti_status = BackfillJob._DagRunTaskStatus()
 
-        # consider max_active_runs but ignore when running subdags
-        # "parent.child" as a dag_id is by convention a subdag
-        if self.dag.schedule_interval and not self.dag.is_subdag:
-            all_active_runs = DagRun.find(
-                dag_id=self.dag.dag_id,
-                state=State.RUNNING,
-                external_trigger=False,
-                session=session
-            )
-
-            # return if already reached maximum active runs
-            if len(all_active_runs) >= self.dag.max_active_runs:
-                self.logger.info("Dag {} has reached maximum amount of {} dag 
runs"
-                                 .format(self.dag.dag_id, 
self.dag.max_active_runs))
-                return
-
         start_date = self.bf_start_date
 
         # Get intervals between the start/end dates, which will turn into dag 
runs
@@ -2372,20 +2383,37 @@ class BackfillJob(BaseJob):
         executor = self.executor
         executor.start()
 
-        self._execute_for_run_dates(run_dates=run_dates,
-                                    ti_status=ti_status,
-                                    executor=executor,
-                                    pickle_id=pickle_id,
-                                    start_date=start_date,
-                                    session=session)
+        ti_status.total_runs = len(run_dates)  # total dag runs in backfill
 
-        executor.end()
-        session.commit()
-        session.close()
-
-        err = self._collect_errors(ti_status=ti_status, session=session)
-        if err:
-            raise AirflowException(err)
+        try:
+            remaining_dates = ti_status.total_runs
+            while remaining_dates > 0:
+                dates_to_process = [run_date for run_date in run_dates
+                                    if run_date not in 
ti_status.executed_dag_run_dates]
+
+                self._execute_for_run_dates(run_dates=dates_to_process,
+                                            ti_status=ti_status,
+                                            executor=executor,
+                                            pickle_id=pickle_id,
+                                            start_date=start_date,
+                                            session=session)
+
+                remaining_dates = (
+                    ti_status.total_runs - 
len(ti_status.executed_dag_run_dates)
+                )
+                err = self._collect_errors(ti_status=ti_status, 
session=session)
+                if err:
+                    raise AirflowException(err)
+
+                if remaining_dates > 0:
+                    self.logger.info(("max_active_runs limit for dag {} has 
been reached "
+                                     " - waiting for other dag runs to finish")
+                                     .format(self.dag_id))
+                    time.sleep(self.delay_on_limit_secs)
+        finally:
+            executor.end()
+            session.commit()
+            session.close()
 
         self.logger.info("Backfill done. Exiting.")
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 959d475..45bcdc7 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3054,7 +3054,7 @@ class DAG(BaseDag, LoggingMixin):
     @provide_session
     def get_active_runs(self, session=None):
         """
-        Returns a list of "running" tasks
+        Returns a list of dag run execution dates currently running
         :param session:
         :return: List of execution dates
         """
@@ -3067,6 +3067,25 @@ class DAG(BaseDag, LoggingMixin):
         return active_dates
 
     @provide_session
+    def get_num_active_runs(self, external_trigger=None, session=None):
+        """
+        Returns the number of active "running" dag runs
+        :param external_trigger: True for externally triggered active dag runs
+        :type external_trigger: bool
+        :param session:
+        :return: number greater than 0 for active dag runs
+        """
+        query = (session
+                 .query(DagRun)
+                 .filter(DagRun.dag_id == self.dag_id)
+                 .filter(DagRun.state == State.RUNNING))
+
+        if external_trigger is not None:
+            query = query.filter(DagRun.external_trigger == external_trigger)
+
+        return query.count()
+
+    @provide_session
     def get_dagrun(self, execution_date, session=None):
         """
         Returns the dag run for a given execution date if it exists, otherwise
@@ -3524,9 +3543,34 @@ class DAG(BaseDag, LoggingMixin):
             donot_pickle=configuration.getboolean('core', 'donot_pickle'),
             ignore_task_deps=False,
             ignore_first_depends_on_past=False,
-            pool=None):
+            pool=None,
+            delay_on_limit_secs=1.0):
         """
         Runs the DAG.
+        :param start_date: the start date of the range to run
+        :type start_date: datetime
+        :param end_date: the end date of the range to run
+        :type end_date: datetime
+        :param mark_success: True to mark jobs as succeeded without running 
them
+        :type mark_success: bool
+        :param include_adhoc: True to include dags with the adhoc parameter
+        :type include_adhoc: bool
+        :param local: True to run the tasks using the LocalExecutor
+        :type local: bool
+        :param executor: The executor instance to run the tasks
+        :type executor: BaseExecutor
+        :param donot_pickle: True to avoid pickling DAG object and send to 
workers
+        :type donot_pickle: bool
+        :param ignore_task_deps: True to skip upstream tasks
+        :type ignore_task_deps: bool
+        :param ignore_first_depends_on_past: True to ignore depends_on_past
+            dependencies for the first set of tasks only
+        :type ignore_first_depends_on_past: bool
+        :param pool: Resource pool to use
+        :type pool: string
+        :param delay_on_limit_secs: Time in seconds to wait before next 
attempt to run
+            dag run when max_active_runs limit has been reached
+        :type delay_on_limit_secs: float
         """
         from airflow.jobs import BackfillJob
         if not executor and local:
@@ -3543,7 +3587,8 @@ class DAG(BaseDag, LoggingMixin):
             donot_pickle=donot_pickle,
             ignore_task_deps=ignore_task_deps,
             ignore_first_depends_on_past=ignore_first_depends_on_past,
-            pool=pool)
+            pool=pool,
+            delay_on_limit_secs=delay_on_limit_secs)
         job.run()
 
     def cli(self):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ddc50269/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index fa27b46..1c0b5cc 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -24,6 +24,7 @@ import shutil
 import unittest
 import six
 import socket
+import threading
 from tempfile import mkdtemp
 
 from airflow import AirflowException, settings, models
@@ -303,6 +304,23 @@ class BackfillJobTest(unittest.TestCase):
         self.assertEqual(ti.state, State.SUCCESS)
         dag.clear()
 
+    def test_cli_receives_delay_arg(self):
+        """
+        Tests that the --delay argument is passed correctly to the BackfillJob
+        """
+        dag_id = 'example_bash_operator'
+        run_date = DEFAULT_DATE
+        args = [
+            'backfill',
+            dag_id,
+            '-s',
+            run_date.isoformat(),
+            '--delay_on_limit',
+            '0.5',
+        ]
+        parsed_args = self.parser.parse_args(args)
+        self.assertEqual(0.5, parsed_args.delay_on_limit)
+
     def _get_dag_test_max_active_limits(self, dag_id, max_active_runs=1):
         dag = DAG(
             dag_id=dag_id,
@@ -328,7 +346,7 @@ class BackfillJobTest(unittest.TestCase):
             'test_backfill_max_limit_check_within_limit',
             max_active_runs=16)
 
-        start_date = DEFAULT_DATE - datetime.timedelta(hours=3)
+        start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
         end_date = DEFAULT_DATE
 
         executor = TestExecutor(do_update=True)
@@ -339,22 +357,91 @@ class BackfillJobTest(unittest.TestCase):
                           donot_pickle=True)
         job.run()
 
-        # dag run could not run since the max_active_runs has been reached
         dagruns = DagRun.find(dag_id=dag.dag_id)
-        self.assertEqual(4, len(dagruns))
+        self.assertEqual(2, len(dagruns))
         self.assertTrue(all([run.state == State.SUCCESS for run in dagruns]))
 
     def test_backfill_max_limit_check(self):
-        dag = 
self._get_dag_test_max_active_limits('test_backfill_max_limit_check')
+        dag_id = 'test_backfill_max_limit_check'
+        run_id = 'test_dagrun'
+        start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
+        end_date = DEFAULT_DATE
+
+        dag_run_created_cond = threading.Condition()
+
+        def run_backfill(cond):
+            cond.acquire()
+            try:
+                dag = self._get_dag_test_max_active_limits(dag_id)
+
+                # this session object is different than the one in the main 
thread
+                thread_session = settings.Session()
+
+                # Existing dagrun that is not within the backfill range
+                dag.create_dagrun(
+                    run_id=run_id,
+                    state=State.RUNNING,
+                    execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
+                    start_date=DEFAULT_DATE,
+                )
+
+                thread_session.commit()
+                cond.notify()
+            finally:
+                cond.release()
+
+            executor = TestExecutor(do_update=True)
+            job = BackfillJob(dag=dag,
+                              start_date=start_date,
+                              end_date=end_date,
+                              executor=executor,
+                              donot_pickle=True)
+            job.run()
+
+            thread_session.close()
+
+        backfill_job_thread = threading.Thread(target=run_backfill,
+                                               name="run_backfill",
+                                               args=(dag_run_created_cond,))
+
+        dag_run_created_cond.acquire()
+        session = settings.Session()
+        backfill_job_thread.start()
+        try:
+            # at this point backfill can't run since the max_active_runs has 
been
+            # reached, so it is waiting
+            dag_run_created_cond.wait(timeout=1.5)
+            dagruns = DagRun.find(dag_id=dag_id)
+            dr = dagruns[0]
+            self.assertEqual(1, len(dagruns))
+            self.assertEqual(dr.run_id, run_id)
+
+            # allow the backfill to execute by setting the existing dag run to 
SUCCESS,
+            # backfill will execute dag runs 1 by 1
+            dr.set_state(State.SUCCESS)
+            session.merge(dr)
+            session.commit()
+            session.close()
+
+            backfill_job_thread.join()
 
-        start_date = DEFAULT_DATE - datetime.timedelta(hours=3)
+            dagruns = DagRun.find(dag_id=dag_id)
+            self.assertEqual(3, len(dagruns))  # 2 from backfill + 1 existing
+            self.assertEqual(dagruns[-1].run_id, dr.run_id)
+        finally:
+            dag_run_created_cond.release()
+
+    def test_backfill_max_limit_check_no_count_existing(self):
+        dag = self._get_dag_test_max_active_limits(
+            'test_backfill_max_limit_check_no_count_existing')
+        start_date = DEFAULT_DATE
         end_date = DEFAULT_DATE
 
-        # Existing dagrun that is not within the backfill range
-        dr = dag.create_dagrun(run_id="test_dagrun",
-                               state=State.RUNNING,
-                               execution_date=DEFAULT_DATE + 
datetime.timedelta(hours=1),
-                               start_date=DEFAULT_DATE)
+        # Existing dagrun that is within the backfill range
+        dag.create_dagrun(run_id="test_existing_backfill",
+                          state=State.RUNNING,
+                          execution_date=DEFAULT_DATE,
+                          start_date=DEFAULT_DATE)
 
         executor = TestExecutor(do_update=True)
         job = BackfillJob(dag=dag,
@@ -364,10 +451,35 @@ class BackfillJobTest(unittest.TestCase):
                           donot_pickle=True)
         job.run()
 
-        # dag run could not run since the max_active_runs has been reached
+        # BackfillJob will run since the existing DagRun does not count for 
the max
+        # active limit since it's within the backfill date range.
         dagruns = DagRun.find(dag_id=dag.dag_id)
+        # will only be able to run 1 (the existing one) since there's just
+        # one dag run slot left given the max_active_runs limit
         self.assertEqual(1, len(dagruns))
-        self.assertEqual(dagruns[0].run_id, dr.run_id)
+        self.assertEqual(State.SUCCESS, dagruns[0].state)
+
+    def test_backfill_max_limit_check_complete_loop(self):
+        dag = self._get_dag_test_max_active_limits(
+            'test_backfill_max_limit_check_complete_loop')
+        start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
+        end_date = DEFAULT_DATE
+
+        # Given the max limit to be 1 in active dag runs, we need to run the
+        # backfill job 3 times
+        success_expected = 2
+        executor = TestExecutor(do_update=True)
+        job = BackfillJob(dag=dag,
+                          start_date=start_date,
+                          end_date=end_date,
+                          executor=executor,
+                          donot_pickle=True)
+        job.run()
+
+        success_dagruns = len(DagRun.find(dag_id=dag.dag_id, 
state=State.SUCCESS))
+        running_dagruns = len(DagRun.find(dag_id=dag.dag_id, 
state=State.RUNNING))
+        self.assertEqual(success_expected, success_dagruns)
+        self.assertEqual(0, running_dagruns)  # no dag_runs in running state 
are left
 
     def test_sub_set_subdag(self):
         dag = DAG(
@@ -389,7 +501,7 @@ class BackfillJobTest(unittest.TestCase):
 
         dag.clear()
         dr = dag.create_dagrun(run_id="test",
-                               state=State.SUCCESS,
+                               state=State.RUNNING,
                                execution_date=DEFAULT_DATE,
                                start_date=DEFAULT_DATE)
 
@@ -433,7 +545,7 @@ class BackfillJobTest(unittest.TestCase):
 
         dag.clear()
         dr = dag.create_dagrun(run_id='test',
-                               state=State.SUCCESS,
+                               state=State.RUNNING,
                                execution_date=DEFAULT_DATE,
                                start_date=DEFAULT_DATE)
         executor = TestExecutor(do_update=True)

Reply via email to