Github user yhuai commented on a diff in the pull request:
https://github.com/apache/spark/pull/9055#discussion_r42581799
--- Diff:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
---
@@ -270,6 +273,146 @@ class Analyzer(
}
/**
+ * Rewrite the [[Exists]] [[In]] with left semi join or anti join.
+ */
+ object RewriteFilterSubQuery extends Rule[LogicalPlan] with
PredicateHelper {
+ def unapply(condition: Expression): Option[(Expression,
Seq[Expression])] = {
+ if (condition.resolved == false) {
+ return None
+ }
+
+ val conjuctions = splitConjunctivePredicates(condition).map(_
transformDown {
+ // Remove the Cast expression for SubQueryExpression.
+ case Cast(f: SubQueryExpression, BooleanType) => f
+ }
+ )
+
+ val (subqueries, others) = conjuctions.partition(c =>
c.isInstanceOf[SubQueryExpression])
+ if (subqueries.isEmpty) {
+ None
+ } else if (subqueries.length > 1) {
+ throw new AnalysisException(
+ s"Only 1 SubQuery expression is supported, but we got
$subqueries")
+ } else {
+ val subQueryExpr = subqueries(0).asInstanceOf[SubQueryExpression]
+ // try to resolve the subquery
+
+ val subquery = Analyzer.this.execute(subQueryExpr.subquery) match {
+ case Distinct(child) => child // Distinct is useless for semi
join, ignore it.
+ case other => other
+ }
+ Some((subQueryExpr.withNewSubQuery(subquery), others))
+ }
+ }
+
+ def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+ case f if f.childrenResolved == false => f
+
+ case f @ Filter(RewriteFilterSubQuery(subquery, others), left) =>
+ subquery match {
+ case Exists(Project(_, Filter(condition, right)), positive) =>
+ checkAnalysis(right)
+ if (condition.resolved) {
+ // Apparently, it should be not resolved here, since EXIST
should be correlated.
+ throw new AnalysisException(
+ s"Exist clause should be correlated, but we got
$condition")
+ }
+ Join(others.reduceOption(And).map(Filter(_,
left)).getOrElse(left), right,
+ if (positive) LeftSemi else LeftAnti,
+ Some(ResolveReferences.tryResolveAttributes(condition,
right)))
+
+ case Exists(right, positive) =>
+ throw new AnalysisException(s"Exist clause should be
correlated, but we got $right")
+
+ case InSubquery(key, Project(projectList, Filter(condition,
right)), positive) =>
+ checkAnalysis(right)
+ if (projectList.length != 1) {
+ throw new AnalysisException(
+ s"Expect only 1 projection in In Subquery Expression, but
we got $projectList")
+ } else {
+ val rightKey =
ResolveReferences.tryResolveAttributes(projectList(0), right)
--- End diff --
Why do we need to manually resolve attributes at here?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]