[
https://issues.apache.org/jira/browse/AIRFLOW-3518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722184#comment-16722184
]
ASF GitHub Bot commented on AIRFLOW-3518:
-
ashb closed pull request #4322: [AIRFLOW-3518] Performance fixes for
topological_sort
URL: https://github.com/apache/incubator-airflow/pull/4322
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index 74555902a5..47ebd9713c 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -23,7 +23,7 @@
from __future__ import unicode_literals
import copy
-from collections import defaultdict, namedtuple
+from collections import defaultdict, namedtuple, OrderedDict
from builtins import ImportError as BuiltinImportError, bytes, object, str
from future.standard_library import install_aliases
@@ -2662,10 +2662,10 @@ def __init__(
}
def __eq__(self, other):
-return (
-type(self) == type(other) and
-all(self.__dict__.get(c, None) == other.__dict__.get(c, None)
-for c in self._comps))
+if (type(self) == type(other) and
+self.task_id == other.task_id):
+return all(self.__dict__.get(c, None) == other.__dict__.get(c,
None) for c in self._comps)
+return False
def __ne__(self, other):
return not self == other
@@ -3443,12 +3443,13 @@ def __repr__(self):
return "".format(self=self)
def __eq__(self, other):
-return (
-type(self) == type(other) and
+if (type(self) == type(other) and
+self.dag_id == other.dag_id):
+
# Use getattr() instead of __dict__ as __dict__ doesn't return
# correct values for properties.
-all(getattr(self, c, None) == getattr(other, c, None)
-for c in self._comps))
+return all(getattr(self, c, None) == getattr(other, c, None) for c
in self._comps)
+return False
def __ne__(self, other):
return not self == other
@@ -3904,8 +3905,8 @@ def topological_sort(self):
:return: list of tasks in topological order
"""
-# copy the the tasks so we leave it unmodified
-graph_unsorted = self.tasks[:]
+# convert into an OrderedDict to speedup lookup while keeping order
the same
+graph_unsorted = OrderedDict((task.task_id, task) for task in
self.tasks)
graph_sorted = []
@@ -3928,14 +3929,14 @@ def topological_sort(self):
# not, we need to bail out as the graph therefore can't be
# sorted.
acyclic = False
-for node in list(graph_unsorted):
+for node in list(graph_unsorted.values()):
for edge in node.upstream_list:
-if edge in graph_unsorted:
+if edge.task_id in graph_unsorted:
break
# no edges in upstream tasks
else:
acyclic = True
-graph_unsorted.remove(node)
+del graph_unsorted[node.task_id]
graph_sorted.append(node)
if not acyclic:
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
> Toposort is very slow
> -
>
> Key: AIRFLOW-3518
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3518
> Project: Apache Airflow
> Issue Type: New Feature
> Components: scheduler
>Reporter: Niels Zeilemaker
>Assignee: Niels Zeilemaker
>Priority: Major
>
> At a client we've discovered that for larger DAGs toposort is very slow.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)