This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3b4eb1f [SPARK-37379][SQL] Add tree pattern pruning to CTESubstitution rule 3b4eb1f is described below commit 3b4eb1fbd8a351c29a12bfd94ec4cdbee803f416 Author: Josh Rosen <joshro...@databricks.com> AuthorDate: Fri Nov 19 15:24:52 2021 -0800 [SPARK-37379][SQL] Add tree pattern pruning to CTESubstitution rule ### What changes were proposed in this pull request? This PR adds tree pattern pruning to the `CTESubstitution` analyzer rule. The rule will now exit early if the tree does not contain an `UnresolvedWith` node. ### Why are the changes needed? Analysis is eagerly performed after every DataFrame transformation. If a user's program performs a long chain of _n_ transformations to construct a large query plan then this can lead to _O(n^2)_ performance costs from `CTESubstitution` because it is applied _n_ times and each application traverses the entire logical plan tree (which contains _O(n)_ nodes). In the case of chained `withColumn` calls (leading to stacked `Project` nodes) it's possible to see _O(n^3)_ slowdowns where _n_ [...] Very large DataFrame plans typically do not use CTEs because there is not a DataFrame syntax for them (although they might appear in the plan if `sql(someQueryWithCTE)` is used). As a result, this PR's proposed optimization to skip `CTESubstitution` can greatly reduce the analysis cost for such plans. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I believe that optimizer correctness is covered by existing tests. As a toy benchmark, I ran ``` import org.apache.spark.sql.DataFrame org.apache.spark.sql.catalyst.rules.RuleExecutor.resetMetrics() (1 to 600).foldLeft(spark.range(100).toDF)((df: DataFrame, i: Int) => df.withColumn(s"col$i", $"id" % i)) println(org.apache.spark.sql.catalyst.rules.RuleExecutor.dumpTimeSpent()) ``` on my laptop before and after this PR's changes (simulating a _O(n^3)_ case). Skipping `CTESubstitution` cut the running time from ~28.4 seconds to ~15.5 seconds. The bulk of the remaining time comes from `DeduplicateRelations`, for which I plan to submit a separate optimization PR. Closes #34658 from JoshRosen/CTESubstitution-tree-pattern-pruning. Authored-by: Josh Rosen <joshro...@databricks.com> Signed-off-by: Josh Rosen <joshro...@databricks.com> --- .../scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala | 3 +++ .../spark/sql/catalyst/plans/logical/basicLogicalOperators.scala | 2 ++ .../main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala | 1 + 3 files changed, 6 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index ec3d957..2e2d415 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -48,6 +48,9 @@ import org.apache.spark.sql.internal.SQLConf.{LEGACY_CTE_PRECEDENCE_POLICY, Lega */ object CTESubstitution extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.containsPattern(UNRESOLVED_WITH)) { + return plan + } val isCommand = plan.find { case _: Command | _: ParsedStatement | _: InsertIntoDir => true case _ => false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index f1b954d..e8a632d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -626,6 +626,8 @@ object View { case class UnresolvedWith( child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { + final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_WITH) + override def output: Seq[Attribute] = child.output override def simpleString(maxFields: Int): String = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 6c1b64d..aad90ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -111,6 +111,7 @@ object TreePattern extends Enumeration { val REPARTITION_OPERATION: Value = Value val UNION: Value = Value val UNRESOLVED_RELATION: Value = Value + val UNRESOLVED_WITH: Value = Value val TYPED_FILTER: Value = Value val WINDOW: Value = Value val WITH_WINDOW_DEFINITION: Value = Value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org