Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/12306#discussion_r59798253
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -852,26 +852,150 @@ class Analyzer(
}
/**
- * This rule resolve subqueries inside expressions.
+ * This rule resolves subqueries inside expressions, rewrites correlated
subqueries or
+ * scalar subqueries.
+ *
+ * It works as following:
+ * 1. For each logical plan, find out the subqueries from expressions,
try to resolve them,
+ * update the SubQueryExpression with resolved logical plan.
+ * 2. For Filter, the condition will be splitted by AND, each
sub-condition that has subqueries
+ * will be rewritten as following:
+ * a. EXISTS/NOT EXISTS will be rewritten as semi/anti join,
unresolved condition in Filter
+ * will be pulled out as join conditions.
+ * b. IN/NOT IN will be rewritten as semi/anti join, unresolved
conditions in the Filter will be
+ * pulled out as join conditions, value = selected column will also
be used as join condition.
*
* Note: CTE are handled in CTESubstitution.
*/
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
- private def hasSubquery(e: Expression): Boolean = {
- e.find(_.isInstanceOf[SubqueryExpression]).isDefined
+ private def hasCorrelatedSubquery(e: Expression): Boolean = {
+ e.find(_.isInstanceOf[CorrelatedSubqueryExpression]).isDefined
}
private def hasSubquery(q: LogicalPlan): Boolean = {
- q.expressions.exists(hasSubquery)
+
q.expressions.exists(_.find(_.isInstanceOf[SubqueryExpression]).isDefined)
+ }
+
+ /**
+ * Removes the conjunctive predicates of Filter that can't be resolved
in this logical plan,
+ * returns the resolved new logical plan and removed predicates.
+ */
+ private def removeUnresolvedPredicates(q: LogicalPlan): (LogicalPlan,
Option[Expression]) = {
+ val unresolvedConditions = ArrayBuffer[Expression]()
+ var hasOuterJoin = false
+ val removed = q transform {
+ case j: Join if j.joinType != Inner =>
+ hasOuterJoin = true
+ j
+ case f @ Filter(cond, child) if child.resolved && !f.resolved =>
+ if (hasOuterJoin) {
+ // The predicates inside a outer/semi join can't be pulled
over join safely.
+ throw new AnalysisException("accessing columns of outer query
inside join is not" +
+ " supported")
+ } else {
+ val (resolved, unresolved) =
splitConjunctivePredicates(cond).partition(_.resolved)
+ unresolvedConditions ++= unresolved
+ if (resolved.nonEmpty) {
+ Filter(resolved.reduceLeft(And), child)
+ } else {
+ child
+ }
+ }
+ }
+ if (unresolvedConditions.nonEmpty) {
+ // try to resolve new logical plan and remove unresolved
predicated again
+ val (removedAgain, moreConditions) =
removeUnresolvedPredicates(execute(removed))
+ (removedAgain, (unresolvedConditions ++
moreConditions.toSeq).reduceLeftOption(And))
+ } else {
+ if (!q.resolved) {
+ throw new AnalysisException(s"subquery can't be resolved:
${q.treeString}")
+ }
+ (q, None)
+ }
+ }
+
+ /**
+ * Returns a resolved subquery and predicate that will be used to
rewrite the IN subquery as
+ * semi join (predicate will be used as join condition).
--- End diff --
Will do - forgot to add it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]