cloud-fan commented on a change in pull request #20303: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#discussion_r252544613
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageCreator.scala
 ##########
 @@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{CollapseCodegenStages, SparkPlan}
+import 
org.apache.spark.sql.execution.adaptive.rule.{AssertChildStagesMaterialized, 
ReduceNumShufflePartitions}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.{EventLoop, ThreadUtils}
+
+/**
+ * This class dynamically creates [[QueryStage]] bottom-up, optimize the query 
plan of query stages
+ * and materialize them. It creates as many query stages as possible at the 
same time, and
+ * materialize a query stage when all its child stages are materialized.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node, and
+ * all the child query stages of this exchange node are materialized, we try 
to create a new query
+ * stage for this exchange node.
+ *
+ * To create a new query stage, we first optimize the sub-tree of the 
exchange. After optimization,
+ * we check the output partitioning of the optimized sub-tree, and see if the 
exchange node is still
+ * necessary.
+ *
+ * If the exchange node becomes unnecessary, remove it and give up this query 
stage creation, and
+ * continue to traverse the query plan tree until we hit the next exchange 
node.
+ *
+ * If the exchange node is still needed, create the query stage and optimize 
its sub-tree again.
+ * It's necessary to have both the pre-creation optimization and post-creation 
optimization, because
+ * these 2 optimization have different assumptions. For pre-creation 
optimization, the shuffle node
+ * may be removed later on and the current sub-tree may be only a part of a 
query stage, so we don't
+ * have the big picture of the query stage yet. For post-creation 
optimization, the query stage is
+ * created and we have the big picture of the query stage.
+ *
+ * After the query stage is optimized, we materialize it asynchronously, and 
continue to traverse
+ * the query plan tree to create more query stages.
+ *
+ * When a query stage completes materialization, we trigger the process of 
query stages creation and
+ * traverse the query plan tree again.
+ */
+class QueryStageCreator(
+    initialPlan: SparkPlan,
+    session: SparkSession,
+    callback: QueryStageTriggerCallback)
+  extends EventLoop[QueryStageCreatorEvent]("QueryStageCreator") {
+
+  private def conf = session.sessionState.conf
+
+  private val readyStages = mutable.HashSet.empty[Int]
+
+  private var currentStageId = 0
+
+  private val stageCache = mutable.HashMap.empty[StructType, 
mutable.Buffer[(Exchange, QueryStage)]]
+
+  // The optimizer rules that will be applied to a sub-tree of the query plan 
before the stage is
+  // created. Note that we may end up not creating the query stage, so the 
rules here should not
+  // assume the given sub-plan-tree is the entire query plan of the query 
stage. For example, if a
+  // rule want to collect all the child query stages, it should not be put 
here.
+  private val preStageCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
+    AssertChildStagesMaterialized
+  )
+
+  // The optimizer rules that will be applied to a sub-tree of the query plan 
after the stage is
+  // created. Note that once the stage is created, we will not remove it 
anymore. If a rule changes
+  // the output partitioning of the sub-plan-tree, which may help to remove 
the exchange node, it's
+  // better to put it in `preStageCreationOptimizerRules`, so that we may 
create less query stages.
+  private val postStageCreationOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
+    ReduceNumShufflePartitions(conf),
+    CollapseCodegenStages(conf))
+
+  private var currentPlan = initialPlan
+
+  private implicit def executionContext: ExecutionContextExecutorService = {
+    QueryStageCreator.executionContext
+  }
+
+  override protected def onReceive(event: QueryStageCreatorEvent): Unit = 
event match {
+    case StartCreation =>
+      // set active session for the event loop thread.
 
 Review comment:
   @carsonwang maybe we should also set spark local properties here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to