viirya commented on a change in pull request #29107:
URL: https://github.com/apache/spark/pull/29107#discussion_r456742673
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -337,6 +337,10 @@ trait CheckAnalysis extends PredicateHelper {
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
+ case Union(_, byName, allowMissingCol) if byName || allowMissingCol
=>
+ failAnalysis("Union should not be with true `byName` or " +
+ "`allowMissingCol` flags after analysis phase.")
Review comment:
And, yes, prevent a unexpected bug during analysis.
##########
File path:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3676,3 +3678,63 @@ object UpdateOuterReferences extends Rule[LogicalPlan] {
}
}
}
+
+/**
+ * Resolves different children of Union to a common set of columns. Note that
this must be
+ * run before `TypeCoercion`, because `TypeCoercion` should be run on
correctly resolved
+ * column by name.
+ */
+object ResolveUnion extends Rule[LogicalPlan] {
+ private def unionTwoSides(
+ left: LogicalPlan,
+ right: LogicalPlan,
+ allowMissingCol: Boolean): LogicalPlan = {
+ val resolver = SQLConf.get.resolver
+ val leftOutputAttrs = left.output
+ val rightOutputAttrs = right.output
+
+ // Builds a project list for `right` based on `left` output names
+ val rightProjectList = leftOutputAttrs.map { lattr =>
+ rightOutputAttrs.find { rattr => resolver(lattr.name, rattr.name)
}.getOrElse {
+ if (allowMissingCol) {
+ Alias(Literal(null, lattr.dataType), lattr.name)()
+ } else {
+ throw new AnalysisException(
+ s"""Cannot resolve column name "${lattr.name}" among """ +
+ s"""(${rightOutputAttrs.map(_.name).mkString(", ")})""")
+ }
+ }
+ }
+
+ // Delegates failure checks to `CheckAnalysis`
+ val notFoundAttrs = rightOutputAttrs.diff(rightProjectList)
+ val rightChild = Project(rightProjectList ++ notFoundAttrs, right)
+
+ // Builds a project for `logicalPlan` based on `right` output names, if
allowing
+ // missing columns.
+ val leftChild = if (allowMissingCol) {
+ val missingAttrs = notFoundAttrs.map { attr =>
+ Alias(Literal(null, attr.dataType), attr.name)()
+ }
+ if (missingAttrs.nonEmpty) {
+ Project(leftOutputAttrs ++ missingAttrs, left)
+ } else {
+ left
+ }
+ } else {
+ left
+ }
+ Union(leftChild, rightChild)
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+ case e if !e.childrenResolved => e
+
+ case Union(children, byName, allowMissingCol)
+ if byName =>
Review comment:
ok
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]