ashb closed pull request #4322: [AIRFLOW-3518] Performance fixes for 
topological_sort
URL: https://github.com/apache/incubator-airflow/pull/4322
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/models.py b/airflow/models.py
index 74555902a5..47ebd9713c 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -23,7 +23,7 @@
 from __future__ import unicode_literals
 
 import copy
-from collections import defaultdict, namedtuple
+from collections import defaultdict, namedtuple, OrderedDict
 
 from builtins import ImportError as BuiltinImportError, bytes, object, str
 from future.standard_library import install_aliases
@@ -2662,10 +2662,10 @@ def __init__(
         }
 
     def __eq__(self, other):
-        return (
-            type(self) == type(other) and
-            all(self.__dict__.get(c, None) == other.__dict__.get(c, None)
-                for c in self._comps))
+        if (type(self) == type(other) and
+                self.task_id == other.task_id):
+            return all(self.__dict__.get(c, None) == other.__dict__.get(c, 
None) for c in self._comps)
+        return False
 
     def __ne__(self, other):
         return not self == other
@@ -3443,12 +3443,13 @@ def __repr__(self):
         return "<DAG: {self.dag_id}>".format(self=self)
 
     def __eq__(self, other):
-        return (
-            type(self) == type(other) and
+        if (type(self) == type(other) and
+                self.dag_id == other.dag_id):
+
             # Use getattr() instead of __dict__ as __dict__ doesn't return
             # correct values for properties.
-            all(getattr(self, c, None) == getattr(other, c, None)
-                for c in self._comps))
+            return all(getattr(self, c, None) == getattr(other, c, None) for c 
in self._comps)
+        return False
 
     def __ne__(self, other):
         return not self == other
@@ -3904,8 +3905,8 @@ def topological_sort(self):
         :return: list of tasks in topological order
         """
 
-        # copy the the tasks so we leave it unmodified
-        graph_unsorted = self.tasks[:]
+        # convert into an OrderedDict to speedup lookup while keeping order 
the same
+        graph_unsorted = OrderedDict((task.task_id, task) for task in 
self.tasks)
 
         graph_sorted = []
 
@@ -3928,14 +3929,14 @@ def topological_sort(self):
             # not, we need to bail out as the graph therefore can't be
             # sorted.
             acyclic = False
-            for node in list(graph_unsorted):
+            for node in list(graph_unsorted.values()):
                 for edge in node.upstream_list:
-                    if edge in graph_unsorted:
+                    if edge.task_id in graph_unsorted:
                         break
                 # no edges in upstream tasks
                 else:
                     acyclic = True
-                    graph_unsorted.remove(node)
+                    del graph_unsorted[node.task_id]
                     graph_sorted.append(node)
 
             if not acyclic:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to