Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/12306#discussion_r59941599
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -852,25 +855,148 @@ class Analyzer(
}
/**
- * This rule resolve subqueries inside expressions.
+ * This rule resolves sub-queries inside expressions.
*
- * Note: CTE are handled in CTESubstitution.
+ * Note: CTEs are handled in CTESubstitution.
*/
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
- private def hasSubquery(e: Expression): Boolean = {
- e.find(_.isInstanceOf[SubqueryExpression]).isDefined
- }
-
- private def hasSubquery(q: LogicalPlan): Boolean = {
- q.expressions.exists(hasSubquery)
+ /**
+ * Resolve the correlated predicates in a sub-queries [[Filter]]s
(WHERE or HAVING) by using the
+ * plan the predicates should be correlated to.
+ */
+ private def resolveCorrelatedPredicates(q: LogicalPlan, p:
LogicalPlan): LogicalPlan = {
+ q transformUp {
+ case f @ Filter(cond, child) if child.resolved && !f.resolved =>
+ val newCond = resolveExpression(cond, p, throws = false)
+ if (!cond.fastEquals(newCond)) {
+ Filter(newCond, child)
+ } else {
+ f
+ }
+ }
}
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case q: LogicalPlan if q.childrenResolved && hasSubquery(q) =>
+ case q: LogicalPlan if q.childrenResolved =>
q transformExpressions {
case e: SubqueryExpression if !e.query.resolved =>
- e.withNewPlan(execute(e.query))
+ // First resolve as much of the sub-query as possible. After
that we use the children of
+ // this plan to resolve the remaining correlated predicates.
+
e.withNewPlan(q.children.foldLeft(execute(e.query))(resolveCorrelatedPredicates))
+ }
+ }
+ }
+
+ /**
+ * This rule rewrites filtering sub-queries into left semi/anti joins.
The following predicates
+ * are supported:
+ * a. EXISTS/NOT EXISTS will be rewritten as semi/anti join, unresolved
conditions in Filter
+ * will be pulled out as the join conditions.
+ * b. IN/NOT IN will be rewritten as semi/anti join, unresolved
conditions in the Filter will
+ * be pulled out as join conditions, value = selected column will
also be used as join
+ * condition.
+ */
+ object RewriteSubquery extends Rule[LogicalPlan] with PredicateHelper {
+ private def hasCorrelatedSubquery(e: Expression): Boolean = {
+ e.find(_.isInstanceOf[CorrelatedSubqueryExpression]).isDefined
+ }
+
+ /**
+ * Extract all correlated predicates from a given sub-query. The
sub-query will be rewritten
+ * without the correlated predicates. The predicates will be combined
using AND. This method
+ * returns the rewritten sub-query and the resulting extracted
predicate.
+ */
+ private def extractCorrelatedPredicates(
+ q: LogicalPlan,
+ p: LogicalPlan): (LogicalPlan, Option[Expression]) = {
+ val fs = mutable.Set.empty[LogicalPlan]
+ val references: Set[Expression] = p.output.toSet
+ val predicates = ArrayBuffer[Expression]()
+ val transformed = q transform {
+ case f @ Filter(cond, child) =>
+ val (correlated, local) =
splitConjunctivePredicates(cond).partition { e =>
+ e.find(references.contains).isDefined
+ }
+ predicates ++= correlated
+ correlated match {
+ case Nil =>
+ f
+ case xs if local.nonEmpty =>
+ val newFilter = Filter(local.reduce(And), child)
+ fs += newFilter
+ newFilter
+ case xs =>
+ fs += child
+ child
+ }
+ case j: Join if j.joinType != Inner && fs.nonEmpty &&
j.find(fs.contains).isDefined =>
+ failAnalysis("accessing columns of outer query inside join is
not supported")
+ }
+ (transformed, predicates.reduceOption(And))
+ }
+
+ /**
+ * Returns a resolved subquery and predicate that will be used to
rewrite the IN subquery as
+ * semi join (predicate will be used as join condition).
+ */
+ private def rewriteInSubquery(
+ value: Expression,
+ subquery: LogicalPlan,
+ query: LogicalPlan): (LogicalPlan, Expression) = {
+ val (resolved, joinCondition) =
extractCorrelatedPredicates(subquery, query)
+
+ // Extract the columns on the expression side.
+ val columns = value match {
+ case CreateStruct(cs) => cs
+ case c => Seq(c)
+ }
+
+ // The number of left and right expressions must be equal.
+ if (columns.length != resolved.output.length) {
+ throw new AnalysisException(s"the number of fields in value
(${columns.length}) does" +
+ s" not match with the number of columns in subquery
(${resolved.output.length})")
+ }
+ val conditions = joinCondition.toSeq ++
columns.zip(resolved.output).map {
+ case (e, a) =>
+ // Check the left and right dataTypes.
+ if (e.dataType != a.dataType) {
+ throw new AnalysisException(
+ s"data type of value (${e.dataType}) does not match with
subquery (${a.dataType})")
+ }
+ EqualTo(e, a)
+ }
+
+ (resolved, conditions.reduceLeft(And))
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case f @ Filter(condition, child) if f.resolved =>
+ val (withSubquery, withoutSubquery) =
+
splitConjunctivePredicates(condition).partition(hasCorrelatedSubquery)
+
+ // Construct the pruned filter condition.
+ val newFilter: LogicalPlan = withoutSubquery match {
+ case Nil => child
+ case conditions => Filter(conditions.reduce(And), child)
+ }
+
+ // Filter the plan by applying left semi and left anti joins.
+ withSubquery.foldLeft(newFilter) {
+ case (p, Exists(sub)) =>
+ val (resolved, joinCondition) =
extractCorrelatedPredicates(sub, p)
+ Join(p, resolved, LeftSemi, joinCondition)
+ case (p, Not(Exists(sub))) =>
+ val (resolved, joinCondition) =
extractCorrelatedPredicates(sub, p)
+ Join(p, resolved, LeftAnti, joinCondition)
+ case (p, InSubQuery(value, sub)) =>
+ val (resolved, cond) = rewriteInSubquery(value, sub, p)
+ Join(p, resolved, LeftSemi, Some(cond))
+ case (p, Not(InSubQuery(value, sub))) =>
--- End diff --
For `A not IN (B, C)`, we need to consider the nullability of A and B. For
example, if B or C is null, `A IN (B, null)` will be null, `A not in (B, null)
` should also be null. This requires a special join (it's usually called
NullAwareAntiJoin).
We could not support that in this PR.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]