Repository: incubator-airflow
Updated Branches:
  refs/heads/airbnb_rb1.7.1_3 [created] 77c7bc4ac


[AIRFLOW-52] Fix bottlenecks when working with many tasks

Dag hash function tried (and failed) to hash the list of tasks, then fell back 
on repr-ing the list, which took forever. Instead, hash 
tuple(task_dict.keys()). In addition this replaces two slow list comprehensions 
with much faster hash lookups (using the new task_dict).

Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0ed36a14
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0ed36a14
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0ed36a14

Branch: refs/heads/airbnb_rb1.7.1_3
Commit: 0ed36a14d8047bbfac749c73a81581f293b43af6
Parents: f1ff65c
Author: Jeremiah Lowin <jlo...@users.noreply.github.com>
Authored: Sat May 7 07:20:47 2016 -0400
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon May 9 16:12:49 2016 -0700

----------------------------------------------------------------------
 airflow/models.py | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ed36a14/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 911ec06..dda9bb9 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1873,7 +1873,7 @@ class BaseOperator(object):
         elif self.has_dag() and self.dag is not dag:
             raise AirflowException(
                 "The DAG assigned to {} can not be changed.".format(self))
-        elif self.task_id not in [t.task_id for t in dag.tasks]:
+        elif self.task_id not in dag.task_dict:
             dag.add_task(self)
             self._dag = dag
 
@@ -2420,7 +2420,7 @@ class DAG(LoggingMixin):
 
         self._comps = {
             'dag_id',
-            'tasks',
+            'task_ids',
             'parent_dag',
             'start_date',
             'schedule_interval',
@@ -2447,7 +2447,11 @@ class DAG(LoggingMixin):
     def __hash__(self):
         hash_components = [type(self)]
         for c in self._comps:
-            val = getattr(self, c, None)
+            # task_ids returns a list and lists can't be hashed
+            if c == 'task_ids':
+                val = tuple(self.task_dict.keys())
+            else:
+                val = getattr(self, c, None)
             try:
                 hash(val)
                 hash_components.append(val)
@@ -2932,7 +2936,7 @@ class DAG(LoggingMixin):
         if not task.start_date:
             task.start_date = self.start_date
 
-        if task.task_id in [t.task_id for t in self.tasks]:
+        if task.task_id in self.task_dict:
             raise AirflowException(
                 "Task id '{0}' has already been added "
                 "to the DAG ".format(task.task_id))

Reply via email to