Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14548#discussion_r74338681
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ---
    @@ -68,14 +103,90 @@ case class ScalarSubquery(
     }
     
     /**
    + * A subquery that will check the value of `child` whether is in the 
result of a query or not.
    + */
    +case class InSubquery(
    +    child: Expression,
    +    executedPlan: SubqueryExec,
    +    exprId: ExprId,
    +    private var result: Array[Any] = null,
    +    private var updated: Boolean = false) extends ExecSubqueryExpression {
    +
    +  override def dataType: DataType = BooleanType
    +  override def children: Seq[Expression] = child :: Nil
    +  override def nullable: Boolean = child.nullable
    +  override def toString: String = s"$child IN ${executedPlan.name}"
    +
    +  def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression = 
copy(executedPlan = plan)
    +
    +  override def semanticEquals(other: Expression): Boolean = other match {
    +    case in: InSubquery => child.semanticEquals(in.child) &&
    +      executedPlan.sameResult(in.executedPlan)
    +    case _ => false
    +  }
    +
    +  def updateResult(rows: Array[InternalRow]): Unit = {
    +    result = rows.map(_.get(0, child.dataType)).asInstanceOf[Array[Any]]
    +    updated = true
    +  }
    +
    +  override def eval(input: InternalRow): Any = {
    +    require(updated, s"$this has not finished")
    +    val v = child.eval(input)
    +    if (v == null) {
    +      null
    +    } else {
    +      result.contains(v)
    +    }
    +  }
    +
    +  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    require(updated, s"$this has not finished")
    +    InSet(child, result.toSet).doGenCode(ctx, ev)
    +  }
    +}
    +
    +/**
      * Plans scalar subqueries from that are present in the given 
[[SparkPlan]].
      */
     case class PlanSubqueries(sparkSession: SparkSession) extends 
Rule[SparkPlan] {
       def apply(plan: SparkPlan): SparkPlan = {
         plan.transformAllExpressions {
           case subquery: expressions.ScalarSubquery =>
             val executedPlan = new QueryExecution(sparkSession, 
subquery.plan).executedPlan
    -        ScalarSubquery(executedPlan, subquery.exprId)
    +        ScalarSubquery(
    +          SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan),
    +          subquery.exprId)
    +      case expressions.PredicateSubquery(plan, Seq(e: Expression), _, 
exprId) =>
    +        val executedPlan = new QueryExecution(sparkSession, 
plan).executedPlan
    +        InSubquery(e, SubqueryExec(s"subquery${exprId.id}", executedPlan), 
exprId)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * Find out duplicated exchanges in the spark plan, then use the same 
exchange for all the
    + * references.
    + */
    +case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    Ok, lets leave it then.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to