jchen5 commented on code in PR #42383:
URL: https://github.com/apache/spark/pull/42383#discussion_r1298613693
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -654,6 +654,25 @@ object DecorrelateInnerQuery extends PredicateHelper {
val newProject = Project(newProjectList ++ referencesToAdd,
newChild)
(newProject, joinCond, outerReferenceMap)
+ case w@Window(projectList, partitionSpec, orderSpec, child) =>
+ val outerReferences = collectOuterReferences(w.expressions)
+ assert(outerReferences.isEmpty, s"Correlated column is not allowed
in window " +
+ s"function: $w")
+ val newOuterReferences = parentOuterReferences ++ outerReferences
+ val (newChild, joinCond, outerReferenceMap) =
+ decorrelate(child, newOuterReferences, aggregated = true,
underSetOp)
Review Comment:
The aggregated = true part probably deserves a comment, and should also edit
`// aggregated: a boolean flag indicating whether the result of the plan
will be aggregated.`
to add that it includes window funcs.
Can we also add a test case that verifies this? I think something like this
would probably test the behavior:
`SELECT *, (SELECT RANK() OVER(ORDER BY c) FROM t2 WHERE b >= d) FROM t1`
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -654,6 +654,25 @@ object DecorrelateInnerQuery extends PredicateHelper {
val newProject = Project(newProjectList ++ referencesToAdd,
newChild)
(newProject, joinCond, outerReferenceMap)
+ case w@Window(projectList, partitionSpec, orderSpec, child) =>
+ val outerReferences = collectOuterReferences(w.expressions)
+ assert(outerReferences.isEmpty, s"Correlated column is not allowed
in window " +
+ s"function: $w")
+ val newOuterReferences = parentOuterReferences ++ outerReferences
+ val (newChild, joinCond, outerReferenceMap) =
+ decorrelate(child, newOuterReferences, aggregated = true,
underSetOp)
+ val newProjectList = replaceOuterReferences(projectList,
outerReferenceMap)
+ val newPartitionSpec = replaceOuterReferences(partitionSpec,
outerReferenceMap)
+ val newOrderSpec = replaceOuterReferences(orderSpec,
outerReferenceMap)
+ val referencesToAdd = missingReferences(
+ newPartitionSpec.asInstanceOf[Seq[NamedExpression]],
Review Comment:
What about the project list and order spec?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -654,6 +654,25 @@ object DecorrelateInnerQuery extends PredicateHelper {
val newProject = Project(newProjectList ++ referencesToAdd,
newChild)
(newProject, joinCond, outerReferenceMap)
+ case w@Window(projectList, partitionSpec, orderSpec, child) =>
Review Comment:
nit: spaces around the @ ?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala:
##########
@@ -654,6 +654,25 @@ object DecorrelateInnerQuery extends PredicateHelper {
val newProject = Project(newProjectList ++ referencesToAdd,
newChild)
(newProject, joinCond, outerReferenceMap)
+ case w@Window(projectList, partitionSpec, orderSpec, child) =>
+ val outerReferences = collectOuterReferences(w.expressions)
+ assert(outerReferences.isEmpty, s"Correlated column is not allowed
in window " +
+ s"function: $w")
+ val newOuterReferences = parentOuterReferences ++ outerReferences
+ val (newChild, joinCond, outerReferenceMap) =
+ decorrelate(child, newOuterReferences, aggregated = true,
underSetOp)
+ val newProjectList = replaceOuterReferences(projectList,
outerReferenceMap)
+ val newPartitionSpec = replaceOuterReferences(partitionSpec,
outerReferenceMap)
+ val newOrderSpec = replaceOuterReferences(orderSpec,
outerReferenceMap)
Review Comment:
In CheckAnalysis you are blocking outer refs in the window itself, so these
shouldn't be needed, right? Can leave it like this but add a comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]