ashb closed pull request #4322: [AIRFLOW-3518] Performance fixes for topological_sort URL: https://github.com/apache/incubator-airflow/pull/4322
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/models.py b/airflow/models.py index 74555902a5..47ebd9713c 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -23,7 +23,7 @@ from __future__ import unicode_literals import copy -from collections import defaultdict, namedtuple +from collections import defaultdict, namedtuple, OrderedDict from builtins import ImportError as BuiltinImportError, bytes, object, str from future.standard_library import install_aliases @@ -2662,10 +2662,10 @@ def __init__( } def __eq__(self, other): - return ( - type(self) == type(other) and - all(self.__dict__.get(c, None) == other.__dict__.get(c, None) - for c in self._comps)) + if (type(self) == type(other) and + self.task_id == other.task_id): + return all(self.__dict__.get(c, None) == other.__dict__.get(c, None) for c in self._comps) + return False def __ne__(self, other): return not self == other @@ -3443,12 +3443,13 @@ def __repr__(self): return "<DAG: {self.dag_id}>".format(self=self) def __eq__(self, other): - return ( - type(self) == type(other) and + if (type(self) == type(other) and + self.dag_id == other.dag_id): + # Use getattr() instead of __dict__ as __dict__ doesn't return # correct values for properties. - all(getattr(self, c, None) == getattr(other, c, None) - for c in self._comps)) + return all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps) + return False def __ne__(self, other): return not self == other @@ -3904,8 +3905,8 @@ def topological_sort(self): :return: list of tasks in topological order """ - # copy the the tasks so we leave it unmodified - graph_unsorted = self.tasks[:] + # convert into an OrderedDict to speedup lookup while keeping order the same + graph_unsorted = OrderedDict((task.task_id, task) for task in self.tasks) graph_sorted = [] @@ -3928,14 +3929,14 @@ def topological_sort(self): # not, we need to bail out as the graph therefore can't be # sorted. acyclic = False - for node in list(graph_unsorted): + for node in list(graph_unsorted.values()): for edge in node.upstream_list: - if edge in graph_unsorted: + if edge.task_id in graph_unsorted: break # no edges in upstream tasks else: acyclic = True - graph_unsorted.remove(node) + del graph_unsorted[node.task_id] graph_sorted.append(node) if not acyclic: ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services