Repository: incubator-airflow
Updated Branches:
  refs/heads/master 31f01b838 -> 10d70d9d7


Fix : Don't treat premature tasks as could_not_run tasks


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ab5d4459
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ab5d4459
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ab5d4459

Branch: refs/heads/master
Commit: ab5d445992617585a0ced1d81881a0728f49b13a
Parents: dddfd3b
Author: Siddharth Anand <san...@agari.com>
Authored: Thu May 12 03:37:51 2016 +0000
Committer: Siddharth Anand <san...@agari.com>
Committed: Fri May 13 01:39:39 2016 +0000

----------------------------------------------------------------------
 airflow/jobs.py   |  3 +++
 airflow/models.py | 11 ++++++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab5d4459/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index b21a196..7244d84 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -535,7 +535,10 @@ class SchedulerJob(BaseJob):
             elif ti.is_runnable(flag_upstream_failed=True):
                 self.logger.debug('Queuing task: {}'.format(ti))
                 queue.put((ti.key, pickle_id))
+            elif ti.is_premature():
+                continue
             else:
+                self.logger.debug('Adding task: {} to the COULD_NOT_RUN 
set'.format(ti))
                 could_not_run.add(ti)
 
         # this type of deadlock happens when dagruns can't even start and so

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab5d4459/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 7618bd5..0f664a0 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -898,7 +898,7 @@ class TaskInstance(Base):
         if self.execution_date > datetime.now():
             return False
         # is the task still in the retry waiting period?
-        elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry():
+        elif self.is_premature():
             return False
         # does the task have an end_date prior to the execution date?
         elif self.task.end_date and self.execution_date > self.task.end_date:
@@ -920,6 +920,15 @@ class TaskInstance(Base):
         else:
             return False
 
+
+    def is_premature(self):
+        """
+        Returns whether a task is in UP_FOR_RETRY state and its retry interval
+        has elapsed.
+        """
+        # is the task still in the retry waiting period?
+        return self.state == State.UP_FOR_RETRY and not self.ready_for_retry()
+
     def is_runnable(
             self,
             include_queued=False,

Reply via email to