[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1044807443 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala: ## @@ -424,8 +424,51 @@ case class OuterReference(e: NamedExpression) override def qualifier: Seq[String] = e.qualifier override def exprId: ExprId = e.exprId override def toAttribute: Attribute = e.toAttribute - override def newInstance(): NamedExpression = OuterReference(e.newInstance()) + override def newInstance(): NamedExpression = +OuterReference(e.newInstance()).setNameParts(nameParts) final override val nodePatterns: Seq[TreePattern] = Seq(OUTER_REFERENCE) + + // optional field, the original name parts of UnresolvedAttribute before it is resolved to + // OuterReference. Used in rule ResolveLateralColumnAlias to convert OuterReference back to + // LateralColumnAliasReference. + var nameParts: Option[Seq[String]] = None + def setNameParts(newNameParts: Option[Seq[String]]): OuterReference = { Review Comment: As we discussed before, I feel it is not safe to do so given the current solution in ResolveOuterReference that each rule is applied only once. I made up a query (it can't run, just for demonstration): ``` SELECT * FROM range(1, 7) WHERE ( SELECT id2 FROM (SELECT dept * 2.0 AS id, id + 1 AS id2 FROM $testTable)) > 5 ORDER BY id ``` It is possible that `dept * 2.0` is not resolved because it needs type conversion, so the LCA rule doesn't apply. Then it just wraps the `id` in `id + 1 AS id2` as OuterReference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1044744657 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala: ## @@ -638,6 +638,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case UnresolvedWindowExpression(_, windowSpec) => throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name) }) +// This should not happen, resolved Project or Aggregate should restore or resolve +// all lateral column alias references. Add check for extra safe. Review Comment: I didn't add it intentionally. This is because I don't want those attributes actually can be resolve as LCA but to show in the error msg as UnresolvedAttribute. Also note that unlike RemoveTempResolvedColumn, LCARef can't be directly resolved to the NamedExpression inside of it because the plan won't be right - there is no alias push down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1043652296 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { Review Comment: For future refactoring of resolving columns by @cloud-fan , I will split this rule into two rules by phases for now. After that refactoring, the wrapping phase will be completely embeded into the new big column resolution rule. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1043649041 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { +// phase 1: wrap +val rewrittenPlan = plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) { + case p @ Project(projectList, child) if p.childrenResolved +&& !Analyzer.containsStar(projectList) +&& projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) => + +var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]()) +def wrapLCAReference(e: NamedExpression): NamedExpression = { +
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1042522410 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { Review Comment: The method is about 100 LOC after removing unnecessary TODO comments. But I can split it by phases into two methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail:
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1041325612 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { +// phase 1: wrap +val rewrittenPlan = plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) { + case p @ Project(projectList, child) if p.childrenResolved +&& !Analyzer.containsStar(projectList) +&& projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) => + +var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]()) +def wrapLCAReference(e: NamedExpression): NamedExpression = { +
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1041324238 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { +// phase 1: wrap +val rewrittenPlan = plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) { + case p @ Project(projectList, child) if p.childrenResolved +&& !Analyzer.containsStar(projectList) +&& projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) => + +var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]()) +def wrapLCAReference(e: NamedExpression): NamedExpression = { +
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1041275311 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { +// phase 1: wrap +val rewrittenPlan = plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) { + case p @ Project(projectList, child) if p.childrenResolved +&& !Analyzer.containsStar(projectList) +&& projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) => + +var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]()) +def wrapLCAReference(e: NamedExpression): NamedExpression = { +
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1041319746 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { +// phase 1: wrap +val rewrittenPlan = plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) { + case p @ Project(projectList, child) if p.childrenResolved +&& !Analyzer.containsStar(projectList) +&& projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) => + +var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]()) +def wrapLCAReference(e: NamedExpression): NamedExpression = { +
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1041275311 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveLateralColumnAlias.scala: ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.expressions.{Alias, LateralColumnAliasReference, NamedExpression, OuterReference} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, Project} +import org.apache.spark.sql.catalyst.rules.{Rule, UnknownRuleId} +import org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE, OUTER_REFERENCE, UNRESOLVED_ATTRIBUTE} +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf + +/** + * Resolve lateral column alias, which references the alias defined previously in the SELECT list. + * Plan-wise it handles two types of operators: Project and Aggregate. + * - in Project, pushing down the referenced lateral alias into a newly created Project, resolve + * the attributes referencing these aliases + * - in Aggregate TODO. + * + * The whole process is generally divided into two phases: + * 1) recognize resolved lateral alias, wrap the attributes referencing them with + *[[LateralColumnAliasReference]] + * 2) when the whole operator is resolved, unwrap [[LateralColumnAliasReference]]. + *For Project, it further resolves the attributes and push down the referenced lateral aliases. + *For Aggregate, TODO + * + * Example for Project: + * Before rewrite: + * Project [age AS a, 'a + 1] + * +- Child + * + * After phase 1: + * Project [age AS a, lateralalias(a) + 1] + * +- Child + * + * After phase 2: + * Project [a, a + 1] + * +- Project [child output, age AS a] + *+- Child + * + * Example for Aggregate TODO + * + * + * The name resolution priority: + * local table column > local lateral column alias > outer reference + * + * Because lateral column alias has higher resolution priority than outer reference, it will try + * to resolve an [[OuterReference]] using lateral column alias in phase 1, similar as an + * [[UnresolvedAttribute]]. If success, it strips [[OuterReference]] and also wraps it with + * [[LateralColumnAliasReference]]. + */ +// TODO revisit resolving order: top down, or bottom up +object ResolveLateralColumnAlias extends Rule[LogicalPlan] { + def resolver: Resolver = conf.resolver + + private case class AliasEntry(alias: Alias, index: Int) + private def insertIntoAliasMap( + a: Alias, + idx: Int, + aliasMap: CaseInsensitiveMap[Seq[AliasEntry]]): CaseInsensitiveMap[Seq[AliasEntry]] = { +val prevAliases = aliasMap.getOrElse(a.name, Seq.empty[AliasEntry]) +aliasMap + (a.name -> (prevAliases :+ AliasEntry(a, idx))) + } + + /** + * Use the given the lateral alias candidate to resolve the name parts. + * @return The resolved attribute if succeeds. None if fails to resolve. + */ + private def resolveByLateralAlias( + nameParts: Seq[String], lateralAlias: Alias): Option[NamedExpression] = { +val resolvedAttr = Analyzer.resolveExpressionByPlanOutput( + expr = UnresolvedAttribute(nameParts), + plan = Project(Seq(lateralAlias), OneRowRelation()), + resolver = resolver, + throws = false +).asInstanceOf[NamedExpression] +if (resolvedAttr.resolved) Some(resolvedAttr) else None + } + + private def rewriteLateralColumnAlias(plan: LogicalPlan): LogicalPlan = { +// phase 1: wrap +val rewrittenPlan = plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE), ruleId) { + case p @ Project(projectList, child) if p.childrenResolved +&& !Analyzer.containsStar(projectList) +&& projectList.exists(_.containsAnyPattern(UNRESOLVED_ATTRIBUTE, OUTER_REFERENCE)) => + +var aliasMap = CaseInsensitiveMap(Map[String, Seq[AliasEntry]]()) +def wrapLCAReference(e: NamedExpression): NamedExpression = { +
[GitHub] [spark] anchovYu commented on a diff in pull request #38776: [SPARK-27561][SQL] Support implicit lateral column alias resolution on Project and refactor Analyzer
anchovYu commented on code in PR #38776: URL: https://github.com/apache/spark/pull/38776#discussion_r1040031835 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ## @@ -258,6 +417,17 @@ class Analyzer(override val catalogManager: CatalogManager) TypeCoercion.typeCoercionRules } + private def resolveExpressionByPlanOutput( Review Comment: I alias in the class because the public method in Object needs a `resolver` as parameter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org