anchovYu commented on code in PR #42276:
URL: https://github.com/apache/spark/pull/42276#discussion_r1286512367
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAliasReference.scala:
##########
@@ -131,95 +131,97 @@ object ResolveLateralColumnAliasReference extends
Rule[LogicalPlan] {
(pList.exists(hasWindowExpression) && p.expressions.forall(_.resolved)
&& p.childrenResolved)
}
- override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!conf.getConf(SQLConf.LATERAL_COLUMN_ALIAS_IMPLICIT_ENABLED)) {
- plan
- } else if (plan.containsAnyPattern(TEMP_RESOLVED_COLUMN,
UNRESOLVED_HAVING)) {
- // It should not change the plan if `TempResolvedColumn` or
`UnresolvedHaving` is present in
- // the query plan. These plans need certain plan shape to get recognized
and resolved by other
- // rules, such as Filter/Sort + Aggregate to be matched by
ResolveAggregateFunctions.
- // LCA resolution can break the plan shape, like adding Project above
Aggregate.
- plan
- } else {
- // phase 2: unwrap
- plan.resolveOperatorsUpWithPruning(
- _.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE), ruleId) {
- case p @ Project(projectList, child) if ruleApplicableOnOperator(p,
projectList)
- &&
projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
- var aliasMap = AttributeMap.empty[AliasEntry]
- val referencedAliases = collection.mutable.Set.empty[AliasEntry]
- def unwrapLCAReference(e: NamedExpression): NamedExpression = {
-
e.transformWithPruning(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
- case lcaRef: LateralColumnAliasReference if
aliasMap.contains(lcaRef.a) =>
- val aliasEntry = aliasMap.get(lcaRef.a).get
- // If there is no chaining of lateral column alias reference,
push down the alias
- // and unwrap the LateralColumnAliasReference to the
NamedExpression inside
- // If there is chaining, don't resolve and save to future
rounds
- if
(!aliasEntry.alias.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) {
- referencedAliases += aliasEntry
- lcaRef.ne
- } else {
- lcaRef
- }
- case lcaRef: LateralColumnAliasReference if
!aliasMap.contains(lcaRef.a) =>
- // It shouldn't happen, but restore to unresolved attribute to
be safe.
- UnresolvedAttribute(lcaRef.nameParts)
- }.asInstanceOf[NamedExpression]
- }
- val newProjectList = projectList.zipWithIndex.map {
- case (a: Alias, idx) =>
- val lcaResolved = unwrapLCAReference(a)
- // Insert the original alias instead of rewritten one to detect
chained LCA
- aliasMap += (a.toAttribute -> AliasEntry(a, idx))
- lcaResolved
- case (e, _) =>
- unwrapLCAReference(e)
- }
+ /** Internal application method. A hand-written bottom-up recursive
traverse. */
+ private def apply0(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case p: LogicalPlan if
!p.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE) =>
+ p
- if (referencedAliases.isEmpty) {
- p
- } else {
- val outerProjectList = collection.mutable.Seq(newProjectList: _*)
- val innerProjectList =
-
collection.mutable.ArrayBuffer(child.output.map(_.asInstanceOf[NamedExpression]):
_*)
- referencedAliases.foreach { case AliasEntry(alias: Alias, idx) =>
- outerProjectList.update(idx, alias.toAttribute)
- innerProjectList += alias
- }
- p.copy(
- projectList = outerProjectList.toSeq,
- child = Project(innerProjectList.toSeq, child)
- )
- }
+ // It should not change the Aggregate (and thus the plan shape) if its
parent is an
+ // UnresolvedHaving, to avoid breaking the shape pattern
`UnresolvedHaving - Aggregate`
+ // matched by ResolveAggregateFunctions. See SPARK-42936 and SPARK-44714
for more details.
+ case u @ UnresolvedHaving(_, agg: Aggregate) =>
+ u.copy(child = agg.mapChildren(apply0))
- case agg @ Aggregate(groupingExpressions, aggregateExpressions, _)
- if ruleApplicableOnOperator(agg, aggregateExpressions)
- &&
aggregateExpressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE))
=>
+ case pOriginal: Project if ruleApplicableOnOperator(pOriginal,
pOriginal.projectList)
+ &&
pOriginal.projectList.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE))
=>
+ val p @ Project(projectList, child) = pOriginal.mapChildren(apply0)
Review Comment:
Bottom-up resolution. The rest of code is fully copied and has no change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]