Repository: incubator-airflow Updated Branches: refs/heads/airbnb_rb1.7.1_3 [created] 77c7bc4ac
[AIRFLOW-52] Fix bottlenecks when working with many tasks Dag hash function tried (and failed) to hash the list of tasks, then fell back on repr-ing the list, which took forever. Instead, hash tuple(task_dict.keys()). In addition this replaces two slow list comprehensions with much faster hash lookups (using the new task_dict). Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0ed36a14 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0ed36a14 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0ed36a14 Branch: refs/heads/airbnb_rb1.7.1_3 Commit: 0ed36a14d8047bbfac749c73a81581f293b43af6 Parents: f1ff65c Author: Jeremiah Lowin <jlo...@users.noreply.github.com> Authored: Sat May 7 07:20:47 2016 -0400 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon May 9 16:12:49 2016 -0700 ---------------------------------------------------------------------- airflow/models.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0ed36a14/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 911ec06..dda9bb9 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -1873,7 +1873,7 @@ class BaseOperator(object): elif self.has_dag() and self.dag is not dag: raise AirflowException( "The DAG assigned to {} can not be changed.".format(self)) - elif self.task_id not in [t.task_id for t in dag.tasks]: + elif self.task_id not in dag.task_dict: dag.add_task(self) self._dag = dag @@ -2420,7 +2420,7 @@ class DAG(LoggingMixin): self._comps = { 'dag_id', - 'tasks', + 'task_ids', 'parent_dag', 'start_date', 'schedule_interval', @@ -2447,7 +2447,11 @@ class DAG(LoggingMixin): def __hash__(self): hash_components = [type(self)] for c in self._comps: - val = getattr(self, c, None) + # task_ids returns a list and lists can't be hashed + if c == 'task_ids': + val = tuple(self.task_dict.keys()) + else: + val = getattr(self, c, None) try: hash(val) hash_components.append(val) @@ -2932,7 +2936,7 @@ class DAG(LoggingMixin): if not task.start_date: task.start_date = self.start_date - if task.task_id in [t.task_id for t in self.tasks]: + if task.task_id in self.task_dict: raise AirflowException( "Task id '{0}' has already been added " "to the DAG ".format(task.task_id))