Github user hvanhovell commented on a diff in the pull request:
https://github.com/apache/spark/pull/14548#discussion_r74225490
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ---
@@ -68,14 +103,90 @@ case class ScalarSubquery(
}
/**
+ * A subquery that will check the value of `child` whether is in the
result of a query or not.
+ */
+case class InSubquery(
+ child: Expression,
+ executedPlan: SubqueryExec,
+ exprId: ExprId,
+ private var result: Array[Any] = null,
+ private var updated: Boolean = false) extends ExecSubqueryExpression {
+
+ override def dataType: DataType = BooleanType
+ override def children: Seq[Expression] = child :: Nil
+ override def nullable: Boolean = child.nullable
+ override def toString: String = s"$child IN ${executedPlan.name}"
+
+ def withExecutedPlan(plan: SubqueryExec): ExecSubqueryExpression =
copy(executedPlan = plan)
+
+ override def semanticEquals(other: Expression): Boolean = other match {
+ case in: InSubquery => child.semanticEquals(in.child) &&
+ executedPlan.sameResult(in.executedPlan)
+ case _ => false
+ }
+
+ def updateResult(rows: Array[InternalRow]): Unit = {
+ result = rows.map(_.get(0, child.dataType)).asInstanceOf[Array[Any]]
+ updated = true
+ }
+
+ override def eval(input: InternalRow): Any = {
+ require(updated, s"$this has not finished")
+ val v = child.eval(input)
+ if (v == null) {
+ null
+ } else {
+ result.contains(v)
+ }
+ }
+
+ override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+ require(updated, s"$this has not finished")
+ InSet(child, result.toSet).doGenCode(ctx, ev)
+ }
+}
+
+/**
* Plans scalar subqueries from that are present in the given
[[SparkPlan]].
*/
case class PlanSubqueries(sparkSession: SparkSession) extends
Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressions {
case subquery: expressions.ScalarSubquery =>
val executedPlan = new QueryExecution(sparkSession,
subquery.plan).executedPlan
- ScalarSubquery(executedPlan, subquery.exprId)
+ ScalarSubquery(
+ SubqueryExec(s"subquery${subquery.exprId.id}", executedPlan),
+ subquery.exprId)
+ case expressions.PredicateSubquery(plan, Seq(e: Expression), _,
exprId) =>
--- End diff --
Under what circumstance is this triggered? A predicate subquery in Project?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]