Repository: incubator-airflow Updated Branches: refs/heads/master 31f01b838 -> 10d70d9d7
Fix : Don't treat premature tasks as could_not_run tasks Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ab5d4459 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ab5d4459 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ab5d4459 Branch: refs/heads/master Commit: ab5d445992617585a0ced1d81881a0728f49b13a Parents: dddfd3b Author: Siddharth Anand <san...@agari.com> Authored: Thu May 12 03:37:51 2016 +0000 Committer: Siddharth Anand <san...@agari.com> Committed: Fri May 13 01:39:39 2016 +0000 ---------------------------------------------------------------------- airflow/jobs.py | 3 +++ airflow/models.py | 11 ++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab5d4459/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index b21a196..7244d84 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -535,7 +535,10 @@ class SchedulerJob(BaseJob): elif ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Queuing task: {}'.format(ti)) queue.put((ti.key, pickle_id)) + elif ti.is_premature(): + continue else: + self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti)) could_not_run.add(ti) # this type of deadlock happens when dagruns can't even start and so http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab5d4459/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 7618bd5..0f664a0 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -898,7 +898,7 @@ class TaskInstance(Base): if self.execution_date > datetime.now(): return False # is the task still in the retry waiting period? - elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry(): + elif self.is_premature(): return False # does the task have an end_date prior to the execution date? elif self.task.end_date and self.execution_date > self.task.end_date: @@ -920,6 +920,15 @@ class TaskInstance(Base): else: return False + + def is_premature(self): + """ + Returns whether a task is in UP_FOR_RETRY state and its retry interval + has elapsed. + """ + # is the task still in the retry waiting period? + return self.state == State.UP_FOR_RETRY and not self.ready_for_retry() + def is_runnable( self, include_queued=False,