This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 1324f7d1680 [SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved 1324f7d1680 is described below commit 1324f7d16802e684c04b3773c84be57667452a0b Author: Maryann Xue <maryann....@gmail.com> AuthorDate: Thu Sep 1 22:03:58 2022 +0800 [SPARK-40297][SQL] CTE outer reference nested in CTE main body cannot be resolved This PR fixes a bug where a CTE reference cannot be resolved if this reference occurs in an inner CTE definition nested in the outer CTE's main body FROM clause. E.g., ``` WITH cte_outer AS ( SELECT 1 ) SELECT * FROM ( WITH cte_inner AS ( SELECT * FROM cte_outer ) SELECT * FROM cte_inner ) ``` This fix is to change the `CTESubstitution`'s traverse order from `resolveOperatorsUpWithPruning` to `resolveOperatorsDownWithPruning` and also to recursively call `traverseAndSubstituteCTE` for CTE main body. Bug fix. Without the fix an `AnalysisException` would be thrown for CTE queries mentioned above. No. Added UTs. Closes #37751 from maryannxue/spark-40297. Authored-by: Maryann Xue <maryann....@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/CTESubstitution.scala | 30 ++-- .../test/resources/sql-tests/inputs/cte-nested.sql | 59 +++++++- .../resources/sql-tests/results/cte-legacy.sql.out | 80 +++++++++++ .../resources/sql-tests/results/cte-nested.sql.out | 79 ++++++++++ .../sql-tests/results/cte-nonlegacy.sql.out | 79 ++++++++++ .../org/apache/spark/sql/CTEInlineSuite.scala | 160 ++++++++++++++++++++- 6 files changed, 476 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala index 62ebfa83431..6a4562450b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala @@ -56,7 +56,7 @@ object CTESubstitution extends Rule[LogicalPlan] { case _ => false } val cteDefs = ArrayBuffer.empty[CTERelationDef] - val (substituted, lastSubstituted) = + val (substituted, firstSubstituted) = LegacyBehaviorPolicy.withName(conf.getConf(LEGACY_CTE_PRECEDENCE_POLICY)) match { case LegacyBehaviorPolicy.EXCEPTION => assertNoNameConflictsInCTE(plan) @@ -68,12 +68,17 @@ object CTESubstitution extends Rule[LogicalPlan] { } if (cteDefs.isEmpty) { substituted - } else if (substituted eq lastSubstituted.get) { + } else if (substituted eq firstSubstituted.get) { WithCTE(substituted, cteDefs.toSeq) } else { var done = false substituted.resolveOperatorsWithPruning(_ => !done) { - case p if p eq lastSubstituted.get => + case p if p eq firstSubstituted.get => + // `firstSubstituted` is the parent of all other CTEs (if any). + done = true + WithCTE(p, cteDefs.toSeq) + case p if p.children.count(_.containsPattern(CTE)) > 1 => + // This is the first common parent of all CTEs. done = true WithCTE(p, cteDefs.toSeq) } @@ -181,21 +186,28 @@ object CTESubstitution extends Rule[LogicalPlan] { isCommand: Boolean, outerCTEDefs: Seq[(String, CTERelationDef)], cteDefs: ArrayBuffer[CTERelationDef]): (LogicalPlan, Option[LogicalPlan]) = { - var lastSubstituted: Option[LogicalPlan] = None - val newPlan = plan.resolveOperatorsUpWithPruning( + var firstSubstituted: Option[LogicalPlan] = None + val newPlan = plan.resolveOperatorsDownWithPruning( _.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) { case UnresolvedWith(child: LogicalPlan, relations) => val resolvedCTERelations = - resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) - lastSubstituted = Some(substituteCTE(child, isCommand, resolvedCTERelations)) - lastSubstituted.get + resolveCTERelations(relations, isLegacy = false, isCommand, outerCTEDefs, cteDefs) ++ + outerCTEDefs + val substituted = substituteCTE( + traverseAndSubstituteCTE(child, isCommand, resolvedCTERelations, cteDefs)._1, + isCommand, + resolvedCTERelations) + if (firstSubstituted.isEmpty) { + firstSubstituted = Some(substituted) + } + substituted case other => other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case e: SubqueryExpression => e.withNewPlan(apply(e.plan)) } } - (newPlan, lastSubstituted) + (newPlan, firstSubstituted) } private def resolveCTERelations( diff --git a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql index b5d7fa5687b..5f12388b9cb 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/cte-nested.sql @@ -146,4 +146,61 @@ WITH ) SELECT * FROM t3 ) -SELECT * FROM t2; \ No newline at end of file +SELECT * FROM t2; + +-- CTE nested in CTE main body FROM clause references outer CTE def +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner +); + +-- CTE double nested in CTE main body FROM clause references outer CTE def +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM ( + WITH cte_inner_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner_inner + ) + ) + SELECT * FROM cte_inner +); + +-- Invalid reference to invisible CTE def nested CTE def +WITH cte_outer AS ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +); + +-- Invalid reference to invisible CTE def nested CTE def (in FROM) +WITH cte_outer AS ( + SELECT * FROM ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner + ) +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +); \ No newline at end of file diff --git a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out index db7d420a745..264b64ffe96 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-legacy.sql.out @@ -236,3 +236,83 @@ struct<> -- !query output org.apache.spark.sql.AnalysisException Table or view not found: t1; line 5 pos 20 + + +-- !query +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<1:int> +-- !query output +1 + + +-- !query +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM ( + WITH cte_inner_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner_inner + ) + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: cte_outer; line 8 pos 22 + + +-- !query +WITH cte_outer AS ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: cte_invisible_inner; line 9 pos 18 + + +-- !query +WITH cte_outer AS ( + SELECT * FROM ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner + ) +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: cte_invisible_inner; line 11 pos 18 diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out index f714a11d1df..2c622de3f36 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-nested.sql.out @@ -243,3 +243,82 @@ SELECT * FROM t2 struct<1:int> -- !query output 1 + + +-- !query +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<1:int> +-- !query output +1 + + +-- !query +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM ( + WITH cte_inner_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner_inner + ) + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<1:int> +-- !query output +1 + + +-- !query +WITH cte_outer AS ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: cte_invisible_inner; line 9 pos 18 + + +-- !query +WITH cte_outer AS ( + SELECT * FROM ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner + ) +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: cte_invisible_inner; line 11 pos 18 diff --git a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out index 2ab13003d04..283f5a54a42 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-nonlegacy.sql.out @@ -235,3 +235,82 @@ SELECT * FROM t2 struct<1:int> -- !query output 1 + + +-- !query +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<1:int> +-- !query output +1 + + +-- !query +WITH cte_outer AS ( + SELECT 1 +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM ( + WITH cte_inner_inner AS ( + SELECT * FROM cte_outer + ) + SELECT * FROM cte_inner_inner + ) + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<1:int> +-- !query output +1 + + +-- !query +WITH cte_outer AS ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: cte_invisible_inner; line 9 pos 18 + + +-- !query +WITH cte_outer AS ( + SELECT * FROM ( + WITH cte_invisible_inner AS ( + SELECT 1 + ) + SELECT * FROM cte_invisible_inner + ) +) +SELECT * FROM ( + WITH cte_inner AS ( + SELECT * FROM cte_invisible_inner + ) + SELECT * FROM cte_inner +) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +Table or view not found: cte_invisible_inner; line 11 pos 18 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala index 7d45102ac83..e758c6f8df5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.expressions.{And, GreaterThan, LessThan, Literal, Or} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, RepartitionOperation, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.internal.SQLConf @@ -481,6 +481,164 @@ abstract class CTEInlineSuiteBase } } } + + test("Make sure CTESubstitution places WithCTE back in the plan correctly.") { + withView("t") { + Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t") + + // CTE on both sides of join - WithCTE placed over first common parent, i.e., the join. + val df1 = sql( + s""" + |select count(v1.c3), count(v2.c3) from ( + | with + | v1 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v1 + |) v1 join ( + | with + | v2 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v2 + |) v2 on v1.c1 = v2.c1 + """.stripMargin) + checkAnswer(df1, Row(2, 2) :: Nil) + df1.queryExecution.analyzed match { + case Aggregate(_, _, WithCTE(_, cteDefs)) => assert(cteDefs.length == 2) + case other => fail(s"Expect pattern Aggregate(WithCTE(_)) but got $other") + } + + // CTE on one side of join - WithCTE placed back where it was. + val df2 = sql( + s""" + |select count(v1.c3), count(v2.c3) from ( + | select c1, c2, rand() c3 from t + |) v1 join ( + | with + | v2 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v2 + |) v2 on v1.c1 = v2.c1 + """.stripMargin) + checkAnswer(df2, Row(2, 2) :: Nil) + df2.queryExecution.analyzed match { + case Aggregate(_, _, Join(_, SubqueryAlias(_, WithCTE(_, cteDefs)), _, _, _)) => + assert(cteDefs.length == 1) + case other => fail(s"Expect pattern Aggregate(Join(_, WithCTE(_))) but got $other") + } + + // CTE on one side of join and both sides of union - WithCTE placed on first common parent. + val df3 = sql( + s""" + |select count(v1.c3), count(v2.c3) from ( + | select c1, c2, rand() c3 from t + |) v1 join ( + | select * from ( + | with + | v1 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v1 + | ) + | union all + | select * from ( + | with + | v2 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v2 + | ) + |) v2 on v1.c1 = v2.c1 + """.stripMargin) + checkAnswer(df3, Row(4, 4) :: Nil) + df3.queryExecution.analyzed match { + case Aggregate(_, _, Join(_, SubqueryAlias(_, WithCTE(_: Union, cteDefs)), _, _, _)) => + assert(cteDefs.length == 2) + case other => fail( + s"Expect pattern Aggregate(Join(_, (WithCTE(Union(_, _))))) but got $other") + } + + // CTE on one side of join and one side of union - WithCTE placed back where it was. + val df4 = sql( + s""" + |select count(v1.c3), count(v2.c3) from ( + | select c1, c2, rand() c3 from t + |) v1 join ( + | select * from ( + | with + | v1 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v1 + | ) + | union all + | select c1, c2, rand() c3 from t + |) v2 on v1.c1 = v2.c1 + """.stripMargin) + checkAnswer(df4, Row(4, 4) :: Nil) + df4.queryExecution.analyzed match { + case Aggregate(_, _, Join(_, SubqueryAlias(_, Union(children, _, _)), _, _, _)) + if children.head.find(_.isInstanceOf[WithCTE]).isDefined => + assert( + children.head.collect { + case w: WithCTE => w + }.head.cteDefs.length == 1) + case other => fail( + s"Expect pattern Aggregate(Join(_, (WithCTE(Union(_, _))))) but got $other") + } + + // CTE on both sides of join and one side of union - WithCTE placed on first common parent. + val df5 = sql( + s""" + |select count(v1.c3), count(v2.c3) from ( + | with + | v1 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v1 + |) v1 join ( + | select c1, c2, rand() c3 from t + | union all + | select * from ( + | with + | v2 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v2 + | ) + |) v2 on v1.c1 = v2.c1 + """.stripMargin) + checkAnswer(df5, Row(4, 4) :: Nil) + df5.queryExecution.analyzed match { + case Aggregate(_, _, WithCTE(_, cteDefs)) => assert(cteDefs.length == 2) + case other => fail(s"Expect pattern Aggregate(WithCTE(_)) but got $other") + } + + // CTE as root node - WithCTE placed back where it was. + val df6 = sql( + s""" + |with + |v1 as ( + | select c1, c2, rand() c3 from t + |) + |select count(v1.c3), count(v2.c3) from + |v1 join ( + | with + | v2 as ( + | select c1, c2, rand() c3 from t + | ) + | select * from v2 + |) v2 on v1.c1 = v2.c1 + """.stripMargin) + checkAnswer(df6, Row(2, 2) :: Nil) + df6.queryExecution.analyzed match { + case WithCTE(_, cteDefs) => assert(cteDefs.length == 2) + case other => fail(s"Expect pattern WithCTE(_) but got $other") + } + } + } } class CTEInlineSuiteAEOff extends CTEInlineSuiteBase with DisableAdaptiveExecutionSuite --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org