cloud-fan commented on code in PR #48649:
URL: https://github.com/apache/spark/pull/48649#discussion_r1837522751
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1260,8 +1264,32 @@ class AstBuilder extends DataTypeAstBuilder
def createProject() = if (namedExpressions.nonEmpty) {
val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) {
- // If this is a pipe operator |> SELECT clause, add a [[PipeSelect]]
expression wrapping
- // each alias in the project list, so the analyzer can check
invariants later.
+ // If this is a pipe operator |> SELECT clause,
+ // (1) validate all the window references after OVER are valid, and
+ val windowDefs = Option(windowClause)
+ .map(_.namedWindow.asScala.map(_.name.getText).toSet)
+ .getOrElse(collection.immutable.Set.empty[String])
+ // Collect all window names from UnresolvedWindowExpressions
+ val unresolvedWindowNames = namedExpressions.collect {
+ case Alias(wExpr: UnresolvedWindowExpression, _) =>
wExpr.windowSpec.name
+ case UnresolvedAlias(wExpr: UnresolvedWindowExpression, _) =>
wExpr.windowSpec.name
+ }
+ if (unresolvedWindowNames.nonEmpty) {
+ if (windowDefs.isEmpty) {
+ // No window definitions provided, throw error for the first
unresolved window
+ throw
QueryParsingErrors.cannotFindWindowReferenceError(unresolvedWindowNames.head,
ctx)
+ } else {
+ // Find any unresolved window names not defined in windowDefs
+ unresolvedWindowNames.find(!windowDefs.contains(_)) match {
Review Comment:
@Angryrou thanks for the detailed explanation!
Looking at the analyzer rule `WindowsSubstitution`, we actually allow all
the child nodes to reference the window frame definitions inside
`WithWindowDefinition`. This makes sense for classic SQL, as the WINDOW clause
is for the entire query, but it's pretty weird for the pipe SQL.
It looks like we want a variant of `WithWindowDefinition` that only works
for its direct child node. How about we add a flag `forPipeSQL` in
`WithWindowDefinition`, and only resolve the window frames in the direct child.
```
object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(WITH_WINDOW_DEFINITION,
UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
// Lookup WindowSpecDefinitions. This rule works with unresolved
children.
case WithWindowDefinition(windowDefinitions, child) =>
child.resolveExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName))
=>
val windowSpecDefinition = windowDefinitions.getOrElse(windowName,
throw
QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
WindowExpression(c, windowSpecDefinition)
}
}
}
```
`child.resolveExpressions` actually traverses down the entire plan tree, and
we should call `child.transformExpressions` to only resolve expressions in the
current node, when the `forPipeSQL` flag is true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]