This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new f5bc48b0988 [SPARK-39548][SQL] CreateView Command with a window clause query hit a wrong window definition not found issue f5bc48b0988 is described below commit f5bc48b0988dbf6561ace14a87faa9f27974678e Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Thu Jun 23 13:30:34 2022 +0800 [SPARK-39548][SQL] CreateView Command with a window clause query hit a wrong window definition not found issue 1. In the inline CTE code path, fix a bug that top down style unresolved window expression check leads to mis-clarification of a defined window expression. 2. Move unresolved window expression check in project to `CheckAnalysis`. This bug fails a correct query. No. UT Closes #36947 from amaliujia/improvewindow. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 4718d59c6c4e201bf940303a4311dfb753372395) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 10 +--------- .../sql/catalyst/analysis/CheckAnalysis.scala | 10 +++++++++- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 23 ++++++++++++++++++++++ 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d7bf9f2571c..7e30b93c65b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -444,7 +444,7 @@ class Analyzer(override val catalogManager: CatalogManager) * Substitute child plan with WindowSpecDefinitions. */ object WindowsSubstitution extends Rule[LogicalPlan] { - def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning( + def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( _.containsAnyPattern(WITH_WINDOW_DEFINITION, UNRESOLVED_WINDOW_EXPRESSION), ruleId) { // Lookup WindowSpecDefinitions. This rule works with unresolved children. case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions { @@ -453,14 +453,6 @@ class Analyzer(override val catalogManager: CatalogManager) throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName)) WindowExpression(c, windowSpecDefinition) } - - case p @ Project(projectList, _) => - projectList.foreach(_.transformDownWithPruning( - _.containsPattern(UNRESOLVED_WINDOW_EXPRESSION), ruleId) { - case UnresolvedWindowExpression(_, windowSpec) => - throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name) - }) - p } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index caf9514ab94..cba82b8ac2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils} import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} @@ -202,7 +203,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { failAnalysis("grouping_id() can only be used with GroupingSets/Cube/Rollup") case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) && - !e.isInstanceOf[WindowExpression] => + !e.isInstanceOf[WindowExpression] && e.resolved => val w = e.children.find(_.isInstanceOf[WindowFunction]).get failAnalysis(s"Window function $w requires an OVER clause.") @@ -477,6 +478,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { s"""Only a single table generating function is allowed in a SELECT clause, found: | ${exprs.map(_.sql).mkString(",")}""".stripMargin) + case p @ Project(projectList, _) => + projectList.foreach(_.transformDownWithPruning( + _.containsPattern(UNRESOLVED_WINDOW_EXPRESSION)) { + case UnresolvedWindowExpression(_, windowSpec) => + throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name) + }) + case j: Join if !j.duplicateResolved => val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet) failAnalysis( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 581108b81b6..13b47a65dc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4236,6 +4236,29 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark sql("SELECT * FROM testData, LATERAL (SELECT * FROM testData)").collect() } } + + test("SPARK-39548: CreateView will make queries go into inline CTE code path thus" + + "trigger a mis-clarified `window definition not found` issue") { + sql( + """ + |create or replace temporary view test_temp_view as + |with step_1 as ( + |select * , min(a) over w2 as min_a_over_w2 from + |(select 1 as a, 2 as b, 3 as c) window w2 as (partition by b order by c)) , step_2 as + |( + |select *, max(e) over w1 as max_a_over_w1 + |from (select 1 as e, 2 as f, 3 as g) + |join step_1 on true + |window w1 as (partition by f order by g) + |) + |select * + |from step_2 + |""".stripMargin) + + checkAnswer( + sql("select * from test_temp_view"), + Row(1, 2, 3, 1, 2, 3, 1, 1)) + } } case class Foo(bar: Option[String]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org