Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16954#discussion_r105718880
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala
 ---
    @@ -40,19 +43,189 @@ abstract class PlanExpression[T <: QueryPlan[_]] 
extends Expression {
     /**
      * A base interface for expressions that contain a [[LogicalPlan]].
      */
    -abstract class SubqueryExpression extends PlanExpression[LogicalPlan] {
    +abstract class SubqueryExpression(
    +    plan: LogicalPlan,
    +    children: Seq[Expression],
    +    exprId: ExprId) extends PlanExpression[LogicalPlan] {
    +
    +  override lazy val resolved: Boolean = childrenResolved && plan.resolved
    +  override lazy val references: AttributeSet =
    +    if (plan.resolved) super.references -- plan.outputSet else 
super.references
       override def withNewPlan(plan: LogicalPlan): SubqueryExpression
    +  override def semanticEquals(o: Expression): Boolean = o match {
    +    case p: SubqueryExpression =>
    +      this.getClass.getName.equals(p.getClass.getName) && 
plan.sameResult(p.plan) &&
    +        children.length == p.children.length &&
    +        children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
    +    case _ => false
    +  }
     }
     
     object SubqueryExpression {
    +  /**
    +   * Returns true when an expression contains an IN or EXISTS subquery and 
false otherwise.
    +   */
    +  def hasInOrExistsSubquery(e: Expression): Boolean = {
    +    e.find {
    +      case _: ListQuery | _: Exists => true
    +      case _ => false
    +    }.isDefined
    +  }
    +
    +  /**
    +   * Returns true when an expression contains a subquery that has outer 
reference(s). The outer
    +   * reference attributes are kept as children of subquery expression by
    +   * [[org.apache.spark.sql.catalyst.analysis.Analyzer.ResolveSubquery]]
    +   */
       def hasCorrelatedSubquery(e: Expression): Boolean = {
         e.find {
    -      case e: SubqueryExpression if e.children.nonEmpty => true
    +      case s: SubqueryExpression => s.children.nonEmpty
           case _ => false
         }.isDefined
       }
     }
     
    +object SubExprUtils extends PredicateHelper {
    +  /**
    +   * Returns true when an expression contains correlated predicates i.e 
outer references and
    +   * returns false otherwise.
    +   */
    +  def containsOuter(e: Expression): Boolean = {
    +    e.find(_.isInstanceOf[OuterReference]).isDefined
    +  }
    +
    +  /**
    +   * Returns whether there are any null-aware predicate subqueries inside 
Not. If not, we could
    +   * turn the null-aware predicate into not-null-aware predicate.
    +   */
    +  def hasNullAwarePredicateWithinNot(condition: Expression): Boolean = {
    +    splitConjunctivePredicates(condition).exists {
    +      case _: Exists | Not(_: Exists) | In(_, Seq(_: ListQuery)) | 
Not(In(_, Seq(_: ListQuery))) =>
    +        false
    +      case e => e.find { x =>
    +        x.isInstanceOf[Not] && e.find {
    +          case In(_, Seq(_: ListQuery)) => true
    +          case _ => false
    +        }.isDefined
    +      }.isDefined
    +    }
    +
    +  }
    +
    +  /**
    +   * Returns an expression after removing the OuterReference shell.
    +   */
    +  def stripOuterReference(e: Expression): Expression = e.transform { case 
OuterReference(r) => r }
    +
    +  /**
    +   * Returns the list of expressions after removing the OuterReference 
shell from each of
    +   * the expression.
    +   */
    +  def stripOuterReferences(e: Seq[Expression]): Seq[Expression] = 
e.map(stripOuterReference)
    +
    +  /**
    +   * Returns the logical plan after removing the OuterReference shell from 
all the expressions
    +   * of the input logical plan.
    +   */
    +  def stripOuterReferences(p: LogicalPlan): LogicalPlan = {
    +    p.transformAllExpressions {
    +      case OuterReference(a) => a
    +    }
    +  }
    +
    +  /**
    +   * Given a logical plan, returns TRUE if it has an outer reference and 
false otherwise.
    +   */
    +  def hasOuterReferences(plan: LogicalPlan): Boolean = {
    +    plan.find {
    +      case f: Filter => containsOuter(f.condition)
    +      case other => false
    +    }.isDefined
    +  }
    +
    +  /**
    +   * Given a list of expressions, returns the expressions which have outer 
references. Aggregate
    +   * expressions are treated in a special way. If the children of 
aggregate expression contains an
    +   * outer reference, then the entire aggregate expression is marked as an 
outer reference.
    +   * Example (SQL):
    +   * {{{
    +   *   SELECT a FROM l GROUP by 1 HAVING EXISTS (SELECT 1 FROM r WHERE d < 
min(b))
    +   * }}}
    +   * In the above case, we want to mark the entire min(b) as an outer 
reference
    +   * OuterReference(min(b)) instead of min(OuterReference(b)).
    +   * TODO: Currently we don't allow deep correlation. Also, we don't allow 
mixing of
    +   * outer references and local references under an aggregate expression.
    +   * For example (SQL):
    +   * {{{
    +   *   SELECT .. FROM p1
    +   *   WHERE EXISTS (SELECT ...
    +   *                 FROM p2
    +   *                 WHERE EXISTS (SELECT ...
    +   *                               FROM sq
    +   *                               WHERE min(p1.a + p2.b) = sq.c))
    +   *
    +   *   SELECT .. FROM p1
    +   *   WHERE EXISTS (SELECT ...
    +   *                 FROM p2
    +   *                 WHERE EXISTS (SELECT ...
    +   *                               FROM sq
    +   *                               WHERE min(p1.a) + max(p2.b) = sq.c))
    +   *
    +   *   SELECT .. FROM p1
    +   *   WHERE EXISTS (SELECT ...
    +   *                 FROM p2
    +   *                 WHERE EXISTS (SELECT ...
    +   *                               FROM sq
    +   *                               WHERE min(p1.a + sq.c) > 1))
    +   * }}}
    +   * The code below needs to change when we support the above cases.
    +   */
    +  def getOuterReferences(conditions: Seq[Expression]): Seq[Expression] = {
    +    val outerExpressions = ArrayBuffer.empty[Expression]
    +    conditions foreach { expr =>
    +      expr transformDown {
    +        case a: AggregateExpression if containsOuter(a) =>
    --- End diff --
    
    Shouldn't we check if the aggregate contains inner references? You could 
use collect leaves here: 
`a.collectLeaves.forall(_.isInstanceOf[OuterReference])`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to