cloud-fan commented on code in PR #48649:
URL: https://github.com/apache/spark/pull/48649#discussion_r1837522751


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1260,8 +1264,32 @@ class AstBuilder extends DataTypeAstBuilder
 
     def createProject() = if (namedExpressions.nonEmpty) {
       val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) {
-        // If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] 
expression wrapping
-        // each alias in the project list, so the analyzer can check 
invariants later.
+        // If this is a pipe operator |> SELECT clause,
+        // (1) validate all the window references after OVER are valid, and
+        val windowDefs = Option(windowClause)
+          .map(_.namedWindow.asScala.map(_.name.getText).toSet)
+          .getOrElse(collection.immutable.Set.empty[String])
+        // Collect all window names from UnresolvedWindowExpressions
+        val unresolvedWindowNames = namedExpressions.collect {
+          case Alias(wExpr: UnresolvedWindowExpression, _) => 
wExpr.windowSpec.name
+          case UnresolvedAlias(wExpr: UnresolvedWindowExpression, _) => 
wExpr.windowSpec.name
+        }
+        if (unresolvedWindowNames.nonEmpty) {
+          if (windowDefs.isEmpty) {
+            // No window definitions provided, throw error for the first 
unresolved window
+            throw 
QueryParsingErrors.cannotFindWindowReferenceError(unresolvedWindowNames.head, 
ctx)
+          } else {
+            // Find any unresolved window names not defined in windowDefs
+            unresolvedWindowNames.find(!windowDefs.contains(_)) match {

Review Comment:
   @Angryrou thanks for the detailed explanation!
   
   Looking at the analyzer rule `WindowsSubstitution`, we actually allow all 
the child nodes to reference the window frame definitions inside 
`WithWindowDefinition`. This makes sense for classic SQL, as the WINDOW clause 
is for the entire query, but it's pretty weird for the pipe SQL.
   
   It looks like we want a variant of `WithWindowDefinition` that only works 
for its direct child node. How about we add a flag `forPipeSQL` in 
`WithWindowDefinition`, and only resolve the window frames in the direct child.
   
   ```
     object WindowsSubstitution extends Rule[LogicalPlan] {
       def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
         _.containsAnyPattern(WITH_WINDOW_DEFINITION, 
UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
         // Lookup WindowSpecDefinitions. This rule works with unresolved 
children.
         case WithWindowDefinition(windowDefinitions, child) => 
child.resolveExpressions {
           case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) 
=>
             val windowSpecDefinition = windowDefinitions.getOrElse(windowName,
               throw 
QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
             WindowExpression(c, windowSpecDefinition)
         }
       }
     }
   ```
   
   `child.resolveExpressions` actually traverses down the entire plan tree, and 
we should call `child.transformExpressions` to only resolve expressions in the 
current node, when the `forPipeSQL` flag is true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to