Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9055#discussion_r42581797
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -270,6 +273,146 @@ class Analyzer(
       }
     
       /**
    +   * Rewrite the [[Exists]] [[In]] with left semi join or anti join.
    +   */
    +  object RewriteFilterSubQuery extends Rule[LogicalPlan] with 
PredicateHelper {
    +    def unapply(condition: Expression): Option[(Expression, 
Seq[Expression])] = {
    +      if (condition.resolved == false) {
    +        return None
    +      }
    +
    +      val conjuctions = splitConjunctivePredicates(condition).map(_ 
transformDown {
    +          // Remove the Cast expression for SubQueryExpression.
    +          case Cast(f: SubQueryExpression, BooleanType) => f
    +        }
    +      )
    +
    +      val (subqueries, others) = conjuctions.partition(c => 
c.isInstanceOf[SubQueryExpression])
    +      if (subqueries.isEmpty) {
    +        None
    +      } else if (subqueries.length > 1) {
    +        throw new AnalysisException(
    +          s"Only 1 SubQuery expression is supported, but we got 
$subqueries")
    +      } else {
    +        val subQueryExpr = subqueries(0).asInstanceOf[SubQueryExpression]
    +        // try to resolve the subquery
    +
    +        val subquery = Analyzer.this.execute(subQueryExpr.subquery) match {
    +          case Distinct(child) => child // Distinct is useless for semi 
join, ignore it.
    +          case other => other
    +        }
    +        Some((subQueryExpr.withNewSubQuery(subquery), others))
    +      }
    +    }
    +
    +    def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    +      case f if f.childrenResolved == false => f
    +
    +      case f @ Filter(RewriteFilterSubQuery(subquery, others), left) =>
    +        subquery match {
    +          case Exists(Project(_, Filter(condition, right)), positive) =>
    +            checkAnalysis(right)
    +            if (condition.resolved) {
    +              // Apparently, it should be not resolved here, since EXIST 
should be correlated.
    +              throw new AnalysisException(
    +                s"Exist clause should be correlated, but we got 
$condition")
    +            }
    +            Join(others.reduceOption(And).map(Filter(_, 
left)).getOrElse(left), right,
    +              if (positive) LeftSemi else LeftAnti,
    +              Some(ResolveReferences.tryResolveAttributes(condition, 
right)))
    +
    +          case Exists(right, positive) =>
    +            throw new AnalysisException(s"Exist clause should be 
correlated, but we got $right")
    +
    +          case InSubquery(key, Project(projectList, Filter(condition, 
right)), positive) =>
    +            checkAnalysis(right)
    +            if (projectList.length != 1) {
    +              throw new AnalysisException(
    +                s"Expect only 1 projection in In Subquery Expression, but 
we got $projectList")
    --- End diff --
    
    Looks like we need to say that subquery should only generate a single 
column. Also, we need to mention the number of columns that are generated by 
the subquery. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to