Github user dilipbiswal commented on a diff in the pull request:
https://github.com/apache/spark/pull/16954#discussion_r101597685
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
---
@@ -83,29 +95,150 @@ object RewritePredicateSubquery extends
Rule[LogicalPlan] with PredicateHelper {
}
/**
- * Given a predicate expression and an input plan, it rewrites
- * any embedded existential sub-query into an existential join.
- * It returns the rewritten expression together with the updated plan.
- * Currently, it does not support null-aware joins. Embedded NOT IN
predicates
- * are blocked in the Analyzer.
+ * Given a predicate expression and an input plan, it rewrites any
embedded existential sub-query
+ * into an existential join. It returns the rewritten expression
together with the updated plan.
+ * Currently, it does not support NOT IN nested inside a NOT expression.
This case is blocked in
+ * the Analyzer.
*/
private def rewriteExistentialExpr(
exprs: Seq[Expression],
plan: LogicalPlan): (Option[Expression], LogicalPlan) = {
var newPlan = plan
val newExprs = exprs.map { e =>
e transformUp {
- case PredicateSubquery(sub, conditions, nullAware, _) =>
- // TODO: support null-aware join
+ case Exists(sub, conditions, exprId) =>
val exists = AttributeReference("exists", BooleanType, nullable
= false)()
- newPlan = Join(newPlan, sub, ExistenceJoin(exists),
conditions.reduceLeftOption(And))
+ newPlan = Join(newPlan, sub,
+ ExistenceJoin(exists), conditions.reduceLeftOption(And))
exists
- }
+ case In(e, Seq(l@ ListQuery(sub, conditions, exprId))) =>
+ val exists = AttributeReference("exists", BooleanType, nullable
= false)()
+ val inConditions =
getValueExpression(e).zip(sub.output).map(EqualTo.tupled)
+ newPlan = Join(newPlan, sub,
+ ExistenceJoin(exists), (inConditions ++
conditions).reduceLeftOption(And))
+ exists
+ }
}
(newExprs.reduceOption(And), newPlan)
}
}
+ /**
+ * Pull out all (outer) correlated predicates from a given subquery. This
method removes the
+ * correlated predicates from subquery [[Filter]]s and adds the
references of these predicates
+ * to all intermediate [[Project]] and [[Aggregate]] clauses (if they are
missing) in order to
+ * be able to evaluate the predicates at the top level.
+ *
+ * TODO: Look to merge this rule with RewritePredicateSubquery.
+ */
+object PullupCorrelatedPredicates extends Rule[LogicalPlan] with
PredicateHelper {
--- End diff --
This function is mostly copied from Analyzer. The only difference is since
validation is done by analyzer , we only process Filter, Project and Aggregate
here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]