allisonwang-db commented on a change in pull request #32303:
URL: https://github.com/apache/spark/pull/32303#discussion_r622637714
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2234,6 +2260,76 @@ class Analyzer(override val catalogManager:
CatalogManager)
}
}
+ /**
+ * This rule resolves lateral joins.
+ */
+ object ResolveLateralJoin extends Rule[LogicalPlan] {
+ import ResolveReferences._
+
+ /**
+ * Build a project list for Project/Aggregate and expand the star if
possible by first using
+ * the inner query plan. If failed, use the outer query plan to expand the
star and wrap all
+ * expanded attributes in [[OuterReference]]s.
+ */
+ private def expandOuterReference(
+ expressions: Seq[NamedExpression],
+ inner: LogicalPlan,
+ outer: LogicalPlan): Seq[NamedExpression] = {
+
+ // Expand the star expression using the inner plan.
+ def expandInner(star: Star): Seq[NamedExpression] =
+ star.expand(inner, resolver)
+
+ // Leave the star unchanged if the outer plan is unable to resolve the
star.
+ // Otherwise wrap the resolved attributes in outer references.
+ def expandOuter(star: Star): Seq[NamedExpression] = {
+ star.expand(outer, resolver).map {
+ case s: Star => s
+ case other => other
+ .transform { case a: Attribute => OuterReference(a) }
+ .asInstanceOf[NamedExpression]
+ }
+ }
+
+ buildExpandedProjectList(expressions, inner, expandInner) match {
+ case expanded if !containsStar(expanded) => expanded
+ // Expand the remaining star expressions using the outer query plan.
+ case other => buildExpandedProjectList(other, outer, expandOuter)
+ }
+ }
+
+ /**
+ * Resolve the right sub-tree by first using the right query plan itself,
and then try
+ * resolving the unresolved attributes and star expressions using the left
query plan.
+ */
+ private def resolveRightChild(right: LogicalPlan, left: LogicalPlan):
LogicalPlan = {
+ right.resolveOperators {
+ case p: LogicalPlan if !p.childrenResolved => p
+ case p: Project if containsStar(p.projectList) =>
+ p.copy(projectList = expandOuterReference(p.projectList, p.child,
left))
+ case a: Aggregate if containsStar(a.aggregateExpressions) =>
+ a.copy(aggregateExpressions =
+ expandOuterReference(a.aggregateExpressions, a.child, left))
+ case p: LogicalPlan if !p.resolved =>
+ p transformExpressions {
+ case u @ UnresolvedAttribute(nameParts) =>
+ withPosition(u) {
+ p.resolveChildren(nameParts, resolver)
+ .orElse(resolveLiteralFunction(nameParts))
+ .orElse(resolveOuterReference(nameParts, left))
+ .getOrElse(u)
+ }
+ }
+ }
+ }
+
+ override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUp {
+ case j @ Join(left, right, LateralJoin(_), _, _) if left.resolved &&
!right.resolved =>
Review comment:
This is trickier than expected. We can't simply check that `right`
doesn't contain unresolved lateral join because the left subtree of a nested
lateral join can contain outer references:
```sql
SELECT * FROM t1, LATERAL (SELECT * FROM (SELECT t1.c1 + 1 AS c1), LATERAL
(SELECT c1))
```
Plan
```scala
Join LateralJoin (1)
:- Relation T1
+- Join LateralJoin (2)
:- Project [unresolvedalias(t1.c1 + 1, c1)]
: +- OneRowRelation
+- Project ['c1]
+- OneRowRelation
```
Here the outermost lateral join's right subtree will get resolved first,
which will resolve `'c1` as `t1.c1`. But in this case `c1` should be resolved
as `t1.c1 + a AS c1` from the inner lateral join's left subtree instead of
`t1.c1` (double checked with Postgres). I will update the resolution logic
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]