[GitHub] [spark] dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries
dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries URL: https://github.com/apache/spark/pull/26437#discussion_r344582313 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala ## @@ -106,12 +106,20 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Filter the plan by applying left semi and left anti joins. withSubquery.foldLeft(newFilter) { -case (p, Exists(sub, conditions, _)) => - val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - buildJoin(outerPlan, sub, LeftSemi, joinCond) -case (p, Not(Exists(sub, conditions, _))) => - val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - buildJoin(outerPlan, sub, LeftAnti, joinCond) +case (p, exists @ Exists(sub, conditions, _)) => + if (SubqueryExpression.hasCorrelatedSubquery(exists)) { +val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) +buildJoin(outerPlan, sub, LeftSemi, joinCond) + } else { +Filter(exists, newFilter) + } +case (p, Not(exists @ Exists(sub, conditions, _))) => + if (SubqueryExpression.hasCorrelatedSubquery(exists)) { +val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) +buildJoin(outerPlan, sub, LeftAnti, joinCond) + } else { +Filter(Not(exists), newFilter) + } Review comment: @AngersZh I discussed this with Wenchen. Do you think we can safely inject a "LIMIT 1" into our subplan to expedite its execution ? Pl. lets us know what you think ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries
dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries URL: https://github.com/apache/spark/pull/26437#discussion_r344582313 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala ## @@ -106,12 +106,20 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { // Filter the plan by applying left semi and left anti joins. withSubquery.foldLeft(newFilter) { -case (p, Exists(sub, conditions, _)) => - val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - buildJoin(outerPlan, sub, LeftSemi, joinCond) -case (p, Not(Exists(sub, conditions, _))) => - val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) - buildJoin(outerPlan, sub, LeftAnti, joinCond) +case (p, exists @ Exists(sub, conditions, _)) => + if (SubqueryExpression.hasCorrelatedSubquery(exists)) { +val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) +buildJoin(outerPlan, sub, LeftSemi, joinCond) + } else { +Filter(exists, newFilter) + } +case (p, Not(exists @ Exists(sub, conditions, _))) => + if (SubqueryExpression.hasCorrelatedSubquery(exists)) { +val (joinCond, outerPlan) = rewriteExistentialExpr(conditions, p) +buildJoin(outerPlan, sub, LeftAnti, joinCond) + } else { +Filter(Not(exists), newFilter) + } Review comment: @AngersZh I discussed this with Wenchen briefly. Do you think we can safely inject a "LIMIT 1" into our subplan to expedite its execution ? Pl. lets us know what you think ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries
dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries URL: https://github.com/apache/spark/pull/26437#discussion_r344433444 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -171,6 +171,63 @@ case class InSubqueryExec( } } +/** + * The physical node of exists-subquery. This is for support use exists in join's on condition, + * since some join type we can't pushdown exists condition, we plan it here + */ +case class ExistsExec(child: Expression, + subQuery: String, + plan: BaseSubqueryExec, + exprId: ExprId, + private var resultBroadcast: Broadcast[Boolean] = null) + extends ExecSubqueryExpression { + + @transient private var result: Boolean = _ + + override def dataType: DataType = BooleanType + override def children: Seq[Expression] = child :: Nil + override def nullable: Boolean = child.nullable + override def toString: String = s"EXISTS ${plan.name}" + override def withNewPlan(plan: BaseSubqueryExec): ExistsExec = copy(plan = plan) + + override def semanticEquals(other: Expression): Boolean = other match { +case in: ExistsExec => child.semanticEquals(in.child) && plan.sameResult(in.plan) +case _ => false + } + + + def updateResult(): Unit = { +result = !plan.execute().isEmpty() Review comment: @AngersZh You r right. Sorry.. i had written it as IN initially and forgot to adjust to exists :-) Yeah, we need to change RewritePredicateSubquery which handles correlated subquery rewrites. The only thing i am not sure is about the outer joins. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries
dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries URL: https://github.com/apache/spark/pull/26437#discussion_r344250658 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -171,6 +171,63 @@ case class InSubqueryExec( } } +/** + * The physical node of exists-subquery. This is for support use exists in join's on condition, + * since some join type we can't pushdown exists condition, we plan it here + */ +case class ExistsExec(child: Expression, + subQuery: String, + plan: BaseSubqueryExec, + exprId: ExprId, + private var resultBroadcast: Broadcast[Boolean] = null) + extends ExecSubqueryExpression { + + @transient private var result: Boolean = _ + + override def dataType: DataType = BooleanType + override def children: Seq[Expression] = child :: Nil + override def nullable: Boolean = child.nullable + override def toString: String = s"EXISTS ${plan.name}" + override def withNewPlan(plan: BaseSubqueryExec): ExistsExec = copy(plan = plan) + + override def semanticEquals(other: Expression): Boolean = other match { +case in: ExistsExec => child.semanticEquals(in.child) && plan.sameResult(in.plan) +case _ => false + } + + + def updateResult(): Unit = { +result = !plan.execute().isEmpty() Review comment: @cloud-fan @AngersZh Thanks for pinging me. Just for me to understand, since we refer to another pr in this pr. So we are considering planning the Subqueries appearing inside ON clause as a Join, right ? Assuming above, so if the query was : ``` SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND T1.C1 EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1) ``` We are considering to plan it as : ``` (T1 LeftSemi T3 ON T1.C1 = T3.C1) Join T2 ON T1.C1 = T2.C2 ``` This Looks okay to me for inner joins. I am just not sure about outer joins.. What do you think Wenchen ? Now, coming to the non-correlated subqueries, if we keep it as a PlanExpression and execute it, one thing we have to see is "what is the join strategy thats being picked". Its always going to be broadcast nested loop as it won't be a "equi-join" ? right ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries
dilipbiswal commented on a change in pull request #26437: [SPARK-29800][SQL] Plan Exists 's subquery in PlanSubqueries URL: https://github.com/apache/spark/pull/26437#discussion_r344250658 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala ## @@ -171,6 +171,63 @@ case class InSubqueryExec( } } +/** + * The physical node of exists-subquery. This is for support use exists in join's on condition, + * since some join type we can't pushdown exists condition, we plan it here + */ +case class ExistsExec(child: Expression, + subQuery: String, + plan: BaseSubqueryExec, + exprId: ExprId, + private var resultBroadcast: Broadcast[Boolean] = null) + extends ExecSubqueryExpression { + + @transient private var result: Boolean = _ + + override def dataType: DataType = BooleanType + override def children: Seq[Expression] = child :: Nil + override def nullable: Boolean = child.nullable + override def toString: String = s"EXISTS ${plan.name}" + override def withNewPlan(plan: BaseSubqueryExec): ExistsExec = copy(plan = plan) + + override def semanticEquals(other: Expression): Boolean = other match { +case in: ExistsExec => child.semanticEquals(in.child) && plan.sameResult(in.plan) +case _ => false + } + + + def updateResult(): Unit = { +result = !plan.execute().isEmpty() Review comment: @cloud-fan @AngersZh Thanks for pinging me. Just for me to understand, since we refer to another pr in this pr. So we are considering planning the Subqueries appearing inside ON clause as a Join, right ? Assuming above, so if the query was : ``` SELECT * FROM T1 JOIN T2 ON T1.C1 = T2.C1 AND T1.C1 EXISTS (SELECT 1 FROM T3 WHERE T1.C1 = T3.C1) ``` We are considering to plan it as : ``` (T1 LeftSemi T3 ON T1.C1 = T3.C1) Join T2 ON T1.C1 = T2.C2 ``` This Looks okay to me for inner joins. I am just not sure about outer joins.. What do you think Wenchen ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org