Repository: incubator-airflow Updated Branches: refs/heads/master 9a61a5bd5 -> 8b86ee6a7
[AIRFLOW-297] support exponential backoff option for retry delay Closes #1639 from jgao54/support-retry-backoff Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8b86ee6a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8b86ee6a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8b86ee6a Branch: refs/heads/master Commit: 8b86ee6a727090acd151ed0a312b627412ddd5eb Parents: 9a61a5b Author: Joy Gao <j...@wepay.com> Authored: Wed Jul 6 08:57:47 2016 -0700 Committer: Chris Riccomini <chr...@wepay.com> Committed: Wed Jul 6 08:57:57 2016 -0700 ---------------------------------------------------------------------- airflow/models.py | 31 ++++++++++++++++++++++++++++--- tests/models.py | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8b86ee6a/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index e514e9c..60de65f 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1145,13 +1145,26 @@ class TaskInstance(Base): "{ti.execution_date} [{ti.state}]>" ).format(ti=self) + def next_retry_datetime(self): + """ + Get datetime of the next retry if the task instance fails. For exponential + backoff, retry_delay is used as base and will be converted to seconds. + """ + delay = self.task.retry_delay + if self.task.retry_exponential_backoff: + delay_backoff_in_seconds = delay.total_seconds() ** self.try_number + delay = timedelta(seconds=delay_backoff_in_seconds) + if self.task.max_retry_delay: + delay = min(self.task.max_retry_delay, delay) + return self.end_date + delay + + def ready_for_retry(self): """ Checks on whether the task instance is in the right state and timeframe to be retried. """ - return self.state == State.UP_FOR_RETRY and \ - self.end_date + self.task.retry_delay < datetime.now() + return self.state == State.UP_FOR_RETRY and self.next_retry_datetime() < datetime.now() @provide_session def pool_full(self, session): @@ -1239,7 +1252,7 @@ class TaskInstance(Base): # todo: move this to the scheduler self.state == State.UP_FOR_RETRY and not self.ready_for_retry()): - next_run = (self.end_date + task.retry_delay).isoformat() + next_run = self.next_retry_datetime().isoformat() logging.info( "Not ready for retry yet. " + "Next run after {0}".format(next_run) @@ -1692,6 +1705,12 @@ class BaseOperator(object): :type retries: int :param retry_delay: delay between retries :type retry_delay: timedelta + :param retry_exponential_backoff: allow progressive longer waits between + retries by using exponential backoff algorithm on retry delay (delay + will be converted into seconds) + :type retry_exponential_backoff: bool + :param max_retry_delay: maximum delay interval between retries + :type max_retry_delay: timedelta :param start_date: The ``start_date`` for the task, determines the ``execution_date`` for the first task instance. The best practice is to have the start_date rounded @@ -1789,6 +1808,8 @@ class BaseOperator(object): email_on_failure=True, retries=0, retry_delay=timedelta(seconds=300), + retry_exponential_backoff=False, + max_retry_delay=None, start_date=None, end_date=None, schedule_interval=None, # not hooked as of now @@ -1864,6 +1885,8 @@ class BaseOperator(object): else: logging.debug("retry_delay isn't timedelta object, assuming secs") self.retry_delay = timedelta(seconds=retry_delay) + self.retry_exponential_backoff = retry_exponential_backoff + self.max_retry_delay = max_retry_delay self.params = params or {} # Available in templates! self.adhoc = adhoc self.priority_weight = priority_weight @@ -1884,6 +1907,8 @@ class BaseOperator(object): 'email', 'email_on_retry', 'retry_delay', + 'retry_exponential_backoff', + 'max_retry_delay', 'start_date', 'schedule_interval', 'depends_on_past', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8b86ee6a/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 3549e8f..49b33d1 100644 --- a/tests/models.py +++ b/tests/models.py @@ -421,6 +421,38 @@ class TaskInstanceTest(unittest.TestCase): self.assertEqual(ti.state, State.FAILED) self.assertEqual(ti.try_number, 4) + def test_next_retry_datetime(self): + delay = datetime.timedelta(seconds=3) + delay_squared = datetime.timedelta(seconds=9) + max_delay = datetime.timedelta(seconds=10) + + dag = models.DAG(dag_id='fail_dag') + task = BashOperator( + task_id='task_with_exp_backoff_and_max_delay', + bash_command='exit 1', + retries=3, + retry_delay=delay, + retry_exponential_backoff=True, + max_retry_delay=max_delay, + dag=dag, + owner='airflow', + start_date=datetime.datetime(2016, 2, 1, 0, 0, 0)) + ti = TI( + task=task, execution_date=datetime.datetime.now()) + ti.end_date = datetime.datetime.now() + + ti.try_number = 1 + dt = ti.next_retry_datetime() + self.assertEqual(dt, ti.end_date+delay) + + ti.try_number = 2 + dt = ti.next_retry_datetime() + self.assertEqual(dt, ti.end_date+delay_squared) + + ti.try_number = 3 + dt = ti.next_retry_datetime() + self.assertEqual(dt, ti.end_date+max_delay) + def test_depends_on_past(self): dagbag = models.DagBag() dag = dagbag.get_dag('test_depends_on_past')