This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new f3c1298 [SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places f3c1298 is described below commit f3c129827986ba06c8a9ab00bd687e8d025103d1 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Mar 26 09:10:03 2021 +0900 [SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/31940 . This PR generalizes the matching of attributes and outer references, so that outer references are handled everywhere. Note that, currently correlated subquery has a lot of limitations in Spark, and the newly covered cases are not possible to happen. So this PR is a code refactor. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #31959 from cloud-fan/follow. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> (cherry picked from commit 658e95c345d5aa2a98b8d2a854e003a5c77ed581) Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 67 +++++++++++++--------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d490845..600a5af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3919,6 +3919,14 @@ object UpdateOuterReferences extends Rule[LogicalPlan] { */ object ApplyCharTypePadding extends Rule[LogicalPlan] { + object AttrOrOuterRef { + def unapply(e: Expression): Option[Attribute] = e match { + case a: Attribute => Some(a) + case OuterReference(a: Attribute) => Some(a) + case _ => None + } + } + override def apply(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUp { case operator => operator.transformExpressionsUp { @@ -3926,27 +3934,17 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { // String literal is treated as char type when it's compared to a char type column. // We should pad the shorter one to the longer length. - case b @ BinaryComparison(attr: Attribute, lit) if lit.foldable => - padAttrLitCmp(attr, attr.metadata, lit).map { newChildren => - b.withNewChildren(newChildren) - }.getOrElse(b) - - case b @ BinaryComparison(lit, attr: Attribute) if lit.foldable => - padAttrLitCmp(attr, attr.metadata, lit).map { newChildren => - b.withNewChildren(newChildren.reverse) - }.getOrElse(b) - - case b @ BinaryComparison(or @ OuterReference(attr: Attribute), lit) if lit.foldable => - padAttrLitCmp(or, attr.metadata, lit).map { newChildren => + case b @ BinaryComparison(e @ AttrOrOuterRef(attr), lit) if lit.foldable => + padAttrLitCmp(e, attr.metadata, lit).map { newChildren => b.withNewChildren(newChildren) }.getOrElse(b) - case b @ BinaryComparison(lit, or @ OuterReference(attr: Attribute)) if lit.foldable => - padAttrLitCmp(or, attr.metadata, lit).map { newChildren => + case b @ BinaryComparison(lit, e @ AttrOrOuterRef(attr)) if lit.foldable => + padAttrLitCmp(e, attr.metadata, lit).map { newChildren => b.withNewChildren(newChildren.reverse) }.getOrElse(b) - case i @ In(attr: Attribute, list) + case i @ In(e @ AttrOrOuterRef(attr), list) if attr.dataType == StringType && list.forall(_.foldable) => CharVarcharUtils.getRawType(attr.metadata).flatMap { case CharType(length) => @@ -3955,7 +3953,7 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { val literalCharLengths = literalChars.map(_.numChars()) val targetLen = (length +: literalCharLengths).max Some(i.copy( - value = addPadding(attr, length, targetLen), + value = addPadding(e, length, targetLen), list = list.zip(literalCharLengths).map { case (lit, charLength) => addPadding(lit, charLength, targetLen) } ++ nulls.map(Literal.create(_, StringType)))) @@ -3963,19 +3961,36 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { }.getOrElse(i) // For char type column or inner field comparison, pad the shorter one to the longer length. - case b @ BinaryComparison(left: Attribute, right: Attribute) => - b.withNewChildren(CharVarcharUtils.addPaddingInStringComparison(Seq(left, right))) - - case b @ BinaryComparison(OuterReference(left: Attribute), right: Attribute) => - b.withNewChildren(padOuterRefAttrCmp(left, right)) - - case b @ BinaryComparison(left: Attribute, OuterReference(right: Attribute)) => - b.withNewChildren(padOuterRefAttrCmp(right, left).reverse) + case b @ BinaryComparison(e1 @ AttrOrOuterRef(left), e2 @ AttrOrOuterRef(right)) + // For the same attribute, they must be the same length and no padding is needed. + if !left.semanticEquals(right) => + val outerRefs = (e1, e2) match { + case (_: OuterReference, _: OuterReference) => Seq(left, right) + case (_: OuterReference, _) => Seq(left) + case (_, _: OuterReference) => Seq(right) + case _ => Nil + } + val newChildren = CharVarcharUtils.addPaddingInStringComparison(Seq(left, right)) + if (outerRefs.nonEmpty) { + b.withNewChildren(newChildren.map(_.transform { + case a: Attribute if outerRefs.exists(_.semanticEquals(a)) => OuterReference(a) + })) + } else { + b.withNewChildren(newChildren) + } - case i @ In(attr: Attribute, list) if list.forall(_.isInstanceOf[Attribute]) => + case i @ In(e @ AttrOrOuterRef(attr), list) if list.forall(_.isInstanceOf[Attribute]) => val newChildren = CharVarcharUtils.addPaddingInStringComparison( attr +: list.map(_.asInstanceOf[Attribute])) - i.copy(value = newChildren.head, list = newChildren.tail) + if (e.isInstanceOf[OuterReference]) { + i.copy( + value = newChildren.head.transform { + case a: Attribute if a.semanticEquals(attr) => OuterReference(a) + }, + list = newChildren.tail) + } else { + i.copy(value = newChildren.head, list = newChildren.tail) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org