Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/17713#discussion_r112804573
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
---
@@ -414,4 +352,269 @@ trait CheckAnalysis extends PredicateHelper {
plan.foreach(_.setAnalyzed())
}
+
+ /**
+ * Validates subquery expressions in the plan. Upon failure, returns an
user facing error.
+ */
+ private def checkSubqueryExpression(plan: LogicalPlan, expr:
SubqueryExpression): Unit = {
+ def checkAggregate(conditions: Seq[Expression], query: LogicalPlan,
agg: Aggregate): Unit = {
+ // Make sure correlated scalar subqueries contain one row for every
outer row by
+ // enforcing that they are aggregates containing exactly one
aggregate expression.
+ val aggregates = agg.expressions.flatMap(_.collect {
+ case a: AggregateExpression => a
+ })
+ if (aggregates.isEmpty) {
+ failAnalysis("The output of a correlated scalar subquery must be
aggregated")
+ }
+
+ // SPARK-18504/SPARK-18814: Block cases where GROUP BY columns
+ // are not part of the correlated columns.
+ val groupByCols =
AttributeSet(agg.groupingExpressions.flatMap(_.references))
+ // Collect the local references from the correlated predicate in the
subquery.
+ val subqueryColumns =
getCorrelatedPredicates(query).flatMap(_.references)
+ .filterNot(conditions.flatMap(_.references).contains)
+ val correlatedCols = AttributeSet(subqueryColumns)
+ val invalidCols = groupByCols -- correlatedCols
+ // GROUP BY columns must be a subset of columns in the predicates
+ if (invalidCols.nonEmpty) {
+ failAnalysis(
+ "A GROUP BY clause in a scalar correlated subquery " +
+ "cannot contain non-correlated columns: " +
+ invalidCols.mkString(","))
+ }
+ }
+
+ // Skip subquery aliases added by the Analyzer.
+ // For projects, do the necessary mapping and skip to its child.
+ def cleanQuery(p: LogicalPlan): LogicalPlan = p match {
+ case s: SubqueryAlias => cleanQuery(s.child)
+ case p: Project => cleanQuery(p.child)
+ case child => child
+ }
+
+ // Validate to make sure the correlations appearing in the query are
valid and
+ // allowed by spark.
+ checkCorrelationsInSubquery(expr.plan)
+
+ expr match {
+ case ScalarSubquery(query, conditions, _) =>
+ // Scalar subquery must return one column as output.
+ if (query.output.size != 1) {
+ failAnalysis(
+ s"Scalar subquery must return only one column, but got
${query.output.size}")
+ }
+
+ if (conditions.nonEmpty) {
+ cleanQuery(query) match {
+ case a: Aggregate => checkAggregate(conditions, query, a)
+ case Filter(_, a: Aggregate) => checkAggregate(conditions,
query, a)
+ case fail => failAnalysis(s"Correlated scalar subqueries must
be Aggregated: $fail")
+ }
+
+ // Only certain operators are allowed to host subquery
expression containing
+ // outer references.
+ plan match {
+ case _: Filter | _: Aggregate | _: Project => // Ok
+ case other => failAnalysis(
+ s"Correlated scalar sub-queries can only be used in a " +
+ s"Filter/Aggregate/Project: $plan")
+ }
+ }
+
+ case inOrExistsSubquery =>
+ plan match {
+ case _: Filter => // Ok
+ case _ => failAnalysis(s"Predicate sub-queries can only be used
in a Filter: $plan")
+ }
+ }
+
+ // Validate the subquery plan.
+ checkAnalysis(expr.plan)
+ }
+
+ /**
+ * Validates to make sure the outer references appearing inside the
subquery
+ * are allowed.
+ */
+ private def checkCorrelationsInSubquery(sub: LogicalPlan): Unit = {
+ // Validate that correlated aggregate expression do not contain a
mixture
+ // of outer and local references.
+ def checkMixedReferencesInsideAggregateExpr(expr: Expression): Unit = {
+ expr.foreach {
+ case a: AggregateExpression if containsOuter(a) =>
+ val outer = a.collect { case OuterReference(e) => e.toAttribute }
+ val local = a.references -- outer
+ if (local.nonEmpty) {
+ val msg =
+ s"""
+ |Found an aggregate expression in a correlated predicate
that has both
+ |outer and local references, which is not supported yet.
+ |Aggregate expression:
${SubExprUtils.stripOuterReference(a).sql},
+ |Outer references: ${outer.map(_.sql).mkString(", ")},
+ |Local references: ${local.map(_.sql).mkString(", ")}.
+ """.stripMargin.replace("\n", " ").trim()
+ failAnalysis(msg)
+ }
+ case _ =>
+ }
+ }
+
+ // Make sure a plan's subtree does not contain outer references
+ def failOnOuterReferenceInSubTree(p: LogicalPlan): Unit = {
+ if (hasOuterReferences(p)) {
+ failAnalysis(s"Accessing outer query column is not allowed
in:\n$p")
+ }
+ }
+
+ // Make sure a plan's expressions do not contain :
+ // 1. Aggregate expressions that have mixture of outer and local
references.
+ // 2. Expressions containing outer references on plan nodes other than
Filter.
+ def failOnInvalidOuterReference(p: LogicalPlan): Unit = {
+ p.expressions.foreach(checkMixedReferencesInsideAggregateExpr)
+ if (!p.isInstanceOf[Filter] && p.expressions.exists(containsOuter)) {
+ failAnalysis(
+ "Expressions referencing the outer query are not supported
outside of WHERE/HAVING " +
+ s"clauses:\n$p")
+ }
+ }
+
+ // SPARK-17348: A potential incorrect result case.
+ // When a correlated predicate is a non-equality predicate,
+ // certain operators are not permitted from the operator
+ // hosting the correlated predicate up to the operator on the outer
table.
+ // Otherwise, the pull up of the correlated predicate
+ // will generate a plan with a different semantics
+ // which could return incorrect result.
+ // Currently we check for Aggregate and Window operators
+ //
+ // Below shows an example of a Logical Plan during Analyzer phase that
+ // show this problem. Pulling the correlated predicate [outer(c2#77)
>= ..]
+ // through the Aggregate (or Window) operator could alter the result of
+ // the Aggregate.
+ //
+ // Project [c1#76]
+ // +- Project [c1#87, c2#88]
+ // : (Aggregate or Window operator)
+ // : +- Filter [outer(c2#77) >= c2#88)]
+ // : +- SubqueryAlias t2, `t2`
+ // : +- Project [_1#84 AS c1#87, _2#85 AS c2#88]
+ // : +- LocalRelation [_1#84, _2#85]
+ // +- SubqueryAlias t1, `t1`
+ // +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
+ // +- LocalRelation [_1#73, _2#74]
+ def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan):
Unit = {
+ if (found) {
+ // Report a non-supported case as an exception
+ failAnalysis(s"Correlated column is not allowed in a non-equality
predicate:\n$p")
+ }
+ }
+
+ var foundNonEqualCorrelatedPred : Boolean = false
+
+ // Simplify the predicates before validating any unsupported
correlation patterns
+ // in the plan.
+ BooleanSimplification(sub).foreachUp {
+ // Whitelist operators allowed in a correlated subquery
+ // There are 4 categories:
+ // 1. Operators that are allowed anywhere in a correlated subquery,
and,
+ // by definition of the operators, they either do not contain
+ // any columns or cannot host outer references.
+ // 2. Operators that are allowed anywhere in a correlated subquery
+ // so long as they do not host outer references.
+ // 3. Operators that need special handlings. These operators are
+ // Filter, Join, Aggregate, and Generate.
+ //
+ // Any operators that are not in the above list are allowed
+ // in a correlated subquery only if they are not on a correlation
path.
+ // In other word, these operators are allowed only under a
correlation point.
+ //
+ // A correlation path is defined as the sub-tree of all the
operators that
+ // are on the path from the operator hosting the correlated
expressions
+ // up to the operator producing the correlated values.
+
+ // Category 1:
+ // BroadcastHint, Distinct, LeafNode, Repartition, and SubqueryAlias
+ case _: BroadcastHint | _: Distinct | _: LeafNode | _: Repartition |
_: SubqueryAlias =>
+
+ // Category 2:
+ // These operators can be anywhere in a correlated subquery.
+ // so long as they do not host outer references in the operators.
+ case p: Project =>
+ failOnInvalidOuterReference(p)
+
+ case s: Sort =>
+ failOnInvalidOuterReference(s)
+
+ case r: RepartitionByExpression =>
+ failOnInvalidOuterReference(r)
+
+ // Category 3:
+ // Filter is one of the two operators allowed to host correlated
expressions.
+ // The other operator is Join. Filter can be anywhere in a
correlated subquery.
+ case f: Filter =>
+ val (correlated, _) =
splitConjunctivePredicates(f.condition).partition(containsOuter)
+
+ // Find any non-equality correlated predicates
+ foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred ||
correlated.exists {
+ case _: EqualTo | _: EqualNullSafe => false
+ case _ => true
+ }
+ failOnInvalidOuterReference(f)
+
+ // Aggregate cannot host any correlated expressions
+ // It can be on a correlation path if the correlation contains
+ // only equality correlated predicates.
+ // It cannot be on a correlation path if the correlation has
+ // non-equality correlated predicates.
+ case a: Aggregate =>
+ failOnInvalidOuterReference(a)
+ failOnNonEqualCorrelatedPredicate(foundNonEqualCorrelatedPred, a)
+
+ // Join can host correlated expressions.
+ case j @ Join(left, right, joinType, _) =>
+ joinType match {
+ // Inner join, like Filter, can be anywhere.
+ case _: InnerLike =>
+ failOnInvalidOuterReference(j)
+
+ // Left outer join's right operand cannot be on a correlation
path.
+ // LeftAnti and ExistenceJoin are special cases of LeftOuter.
+ // Note that ExistenceJoin cannot be expressed externally in
both SQL and DataFrame
+ // so it should not show up here in Analysis phase. This is just
a safety net.
+ //
+ // LeftSemi does not allow output from the right operand.
+ // Any correlated references in the subplan
+ // of the right operand cannot be pulled up.
+ case LeftOuter | LeftSemi | LeftAnti | ExistenceJoin(_) =>
+ failOnInvalidOuterReference(j)
+ failOnOuterReferenceInSubTree(right)
+
+ // Likewise, Right outer join's left operand cannot be on a
correlation path.
+ case RightOuter =>
+ failOnInvalidOuterReference(j)
+ failOnOuterReferenceInSubTree(left)
+
+ // Any other join types not explicitly listed above,
+ // including Full outer join, are treated as Category 4.
+ case _ =>
+ failOnOuterReferenceInSubTree(j)
+ }
+
+ // Generator with join=true, i.e., expressed with
+ // LATERAL VIEW [OUTER], similar to inner join,
+ // allows to have correlation under it
+ // but must not host any outer references.
+ // Note:
+ // Generator with join=false is treated as Category 4.
+ case g: Generate if g.join =>
+ failOnInvalidOuterReference(g)
+
+ // Category 4: Any other operators not in the above 3 categories
+ // cannot be on a correlation path, that is they are allowed only
+ // under a correlation point but they and their descendant operators
+ // are not allowed to have any correlated expressions.
+ case p =>
+ failOnOuterReferenceInSubTree(p)
+ }
+ }
--- End diff --
Just copy it from Analyzer.scala? Please leave some comments for saving the
time of reviewers. Thanks!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]