dbaliafroozeh commented on a change in pull request #26705: [SPARK-30072][SQL] 
Create dedicated planner for subqueries
URL: https://github.com/apache/spark/pull/26705#discussion_r352369121
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
 ##########
 @@ -248,3 +227,67 @@ class QueryExecution(
     }
   }
 }
+
+object QueryExecution {
+  /**
+   * Construct a sequence of rules that are used to prepare a planned 
[[SparkPlan]] for execution.
+   * These rules will make sure subqueries are planned, make use the data 
partitioning and ordering
+   * are correct, insert whole stage code gen, and try to reduce the work done 
by reusing exchanges
+   * and subqueries.
+   */
+  private[execution] def preparations(sparkSession: SparkSession): 
Seq[Rule[SparkPlan]] =
+    Seq(
+      // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the 
following rules will be no-op
+      // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
+      InsertAdaptiveSparkPlan(sparkSession),
+      PlanDynamicPruningFilters(sparkSession),
+      PlanSubqueries(sparkSession),
+      EnsureRequirements(sparkSession.sessionState.conf),
+      ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
+        sparkSession.sessionState.columnarRules),
+      CollapseCodegenStages(sparkSession.sessionState.conf),
+      ReuseExchange(sparkSession.sessionState.conf),
+      ReuseSubquery(sparkSession.sessionState.conf)
+    )
+
+  /**
+   * Prepares a planned [[SparkPlan]] for execution by inserting shuffle 
operations and internal
+   * row format conversions as needed.
+   */
+  private[execution] def prepareForExecution(
+      preparations: Seq[Rule[SparkPlan]],
+      plan: SparkPlan): SparkPlan = {
+    preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) }
+  }
+
+  /**
+   * Transform a [[LogicalPlan]] into a [[SparkPlan]].
+   *
+   * Note that the returned physical plan still needs to be prepared for 
execution.
+   */
+  def createSparkPlan(
+      sparkSession: SparkSession,
+      planner: SparkPlanner,
+      plan: LogicalPlan): SparkPlan = {
+    SparkSession.setActiveSession(sparkSession)
+    // TODO: We use next(), i.e. take the first plan returned by the planner, 
here for now,
+    //       but we will implement to choose the best plan.
+    planner.plan(ReturnAnswer(plan)).next()
+  }
+
+  /**
+   * Plan a subquery. Prepare the [[SparkPlan]] for execution.
+   */
+  def planSubquery(spark: SparkSession, plan: SparkPlan): SparkPlan = {
+    prepareForExecution(preparations(spark), plan)
+  }
 
 Review comment:
   I'm renaming both these methods to `prepareExecutedPlan` as one of them 
prepares a `SparkPlan` for execution and the other one a `LogicalPlan`. I think 
having the same method name makes sense for these methods.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to