[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-14 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r293907198
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 ##
 @@ -317,76 +317,92 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
*/
   def mapChildren(f: BaseType => BaseType): BaseType = {
 if (children.nonEmpty) {
-  var changed = false
-  def mapChild(child: Any): Any = child match {
-case arg: TreeNode[_] if containsChild(arg) =>
-  val newChild = f(arg.asInstanceOf[BaseType])
-  if (!(newChild fastEquals arg)) {
-changed = true
-newChild
-  } else {
-arg
-  }
-case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
-  val newChild1 = if (containsChild(arg1)) {
-f(arg1.asInstanceOf[BaseType])
-  } else {
-arg1.asInstanceOf[BaseType]
-  }
+  mapProductElements(f, applyToAll = false)
+} else {
+  this
+}
+  }
 
-  val newChild2 = if (containsChild(arg2)) {
-f(arg2.asInstanceOf[BaseType])
-  } else {
-arg2.asInstanceOf[BaseType]
-  }
+  /**
+   * Returns a copy of this node where `f` has been applied to all applicable 
`TreeNode` elements
+   * in the productIterator.
+   * @param f the transform function to be applied on applicable `TreeNode` 
elements.
+   * @param applyToAll If true, the transform function will be applied to all 
`TreeNode` elements
+   *   even for non-child elements; otherwise, the function 
will only be applied
+   *   on children nodes. Also, when this is true, a copy of 
this node will be
+   *   returned even if no elements have been changed.
+   */
+  private def mapProductElements(
+  f: BaseType => BaseType,
+  applyToAll: Boolean): BaseType = {
+var changed = false
 
-  if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
-changed = true
-(newChild1, newChild2)
-  } else {
-tuple
-  }
-case other => other
-  }
+def mapChild(child: Any): Any = child match {
+  case arg: TreeNode[_] if applyToAll || containsChild(arg) =>
 
 Review comment:
   I double checked and there's no "fake LeafNode" in the logical plan space. 
So I removed the "applyToAll" from the condition for transforming the elements 
and renamed it to "forceCopy". I've also changed the method name back to 
"mapChildren" since it's only for children nodes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-14 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r293872971
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 ##
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.concurrent.Future
+
+import org.apache.spark.{FutureAction, MapOutputStatistics}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+
+
+/**
+ * A query stage is an independent subgraph of the query plan. Query stage 
materializes its output
+ * before proceeding with further operators of the query plan. The data 
statistics of the
+ * materialized output can be used to optimize subsequent query stages.
+ *
+ * There are 2 kinds of query stages:
+ *   1. Shuffle query stage. This stage materializes its output to shuffle 
files, and Spark launches
+ *  another job to execute the further operators.
+ *   2. Broadcast query stage. This stage materializes its output to an array 
in driver JVM. Spark
+ *  broadcasts the array before executing the further operators.
+ */
+abstract class QueryStageExec extends LeafExecNode {
+
+  /**
+   * An id of this query stage which is unique in the entire query plan.
+   */
+  def id: Int
+
+  /**
+   * The sub-tree of the query plan that belongs to this query stage.
+   */
+  def plan: SparkPlan
+
+  /**
+   * Materialize this query stage, to prepare for the execution, like 
submitting map stages,
+   * broadcasting data, etc. The caller side can use the returned [[Future]] 
to wait until this
+   * stage is ready.
+   */
+  def doMaterialize(): Future[Any]
+
+  /**
+   * Cancel the stage materialization if in progress; otherwise do nothing.
+   */
+  def cancel(): Unit
+
+  /**
+   * Materialize this query stage, to prepare for the execution, like 
submitting map stages,
+   * broadcasting data, etc. The caller side can use the returned [[Future]] 
to wait until this
+   * stage is ready.
+   */
+  final def materialize(): Future[Any] = executeQuery {
+doMaterialize()
+  }
+
+  /**
+   * Compute the statistics of the query stage if executed, otherwise None.
+   */
+  def computeStats(): Option[Statistics] = resultOption.map { _ =>
+// Metrics `dataSize` are available in both `ShuffleExchangeExec` and 
`BroadcastExchangeExec`.
+Statistics(sizeInBytes = plan.metrics("dataSize").value)
+  }
+
+  @transient
+  @volatile
+  private[adaptive] var resultOption: Option[Any] = None
+
+  override def output: Seq[Attribute] = plan.output
+  override def outputPartitioning: Partitioning = plan.outputPartitioning
+  override def outputOrdering: Seq[SortOrder] = plan.outputOrdering
+  override def executeCollect(): Array[InternalRow] = plan.executeCollect()
+  override def executeTake(n: Int): Array[InternalRow] = plan.executeTake(n)
+  override def executeToIterator(): Iterator[InternalRow] = 
plan.executeToIterator()
+
+  override def doPrepare(): Unit = plan.prepare()
+  override def doExecute(): RDD[InternalRow] = plan.execute()
+  override def doExecuteBroadcast[T](): Broadcast[T] = plan.executeBroadcast()
+  override def doCanonicalize(): SparkPlan = plan.canonicalized
+
+  protected override def stringArgs: Iterator[Any] = Iterator.single(id)
+
+  override def generateTreeString(
+  depth: Int,
+  lastChildren: Seq[Boolean],
+  append: String => Unit,
+  verbose: Boolean,
+  prefix: String = "",
+  addSuffix: Boolean = false,
+  maxFields: Int): Unit = {
+super.generateTreeString(depth, lastChildren, append, verbose, prefix, 
addSuffix, maxFields)
+plan.generateTreeString(
+  depth + 1, lastChildren :+ true, append, verbose, "", false, maxFields)
+  }
+}
+
+/**
+ * A 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-14 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r293867735
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
 
 Review comment:
   In one of the commits I had tried refactoring this into 
`AdaptiveSparkPlanExec`, but later found out that this would cause a problem in 
serializing/deserializing `initialPlan` in `AdaptiveSparkPlanExec`, for the 
`initialPlan` before applying the sub-query planning rule contains instances of 
`expression.ScalarSubquery`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-14 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r293867735
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
 
 Review comment:
   In one of the commits I had tried refactoring this into 
`AdaptiveSparkPlanExec`, but later found out that this would cause a problem in 
serializing `initialPlan` in `AdaptiveSparkPlanExec`, for the `initialPlan` 
before applying the sub-query planning rule contains instances of 
`expression.ScalarSubquery`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-14 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r293859013
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 ##
 @@ -317,76 +317,92 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
*/
   def mapChildren(f: BaseType => BaseType): BaseType = {
 if (children.nonEmpty) {
-  var changed = false
-  def mapChild(child: Any): Any = child match {
-case arg: TreeNode[_] if containsChild(arg) =>
-  val newChild = f(arg.asInstanceOf[BaseType])
-  if (!(newChild fastEquals arg)) {
-changed = true
-newChild
-  } else {
-arg
-  }
-case tuple@(arg1: TreeNode[_], arg2: TreeNode[_]) =>
-  val newChild1 = if (containsChild(arg1)) {
-f(arg1.asInstanceOf[BaseType])
-  } else {
-arg1.asInstanceOf[BaseType]
-  }
+  mapProductElements(f, applyToAll = false)
+} else {
+  this
+}
+  }
 
-  val newChild2 = if (containsChild(arg2)) {
-f(arg2.asInstanceOf[BaseType])
-  } else {
-arg2.asInstanceOf[BaseType]
-  }
+  /**
+   * Returns a copy of this node where `f` has been applied to all applicable 
`TreeNode` elements
+   * in the productIterator.
+   * @param f the transform function to be applied on applicable `TreeNode` 
elements.
+   * @param applyToAll If true, the transform function will be applied to all 
`TreeNode` elements
+   *   even for non-child elements; otherwise, the function 
will only be applied
+   *   on children nodes. Also, when this is true, a copy of 
this node will be
+   *   returned even if no elements have been changed.
+   */
+  private def mapProductElements(
+  f: BaseType => BaseType,
+  applyToAll: Boolean): BaseType = {
+var changed = false
 
-  if (!(newChild1 fastEquals arg1) || !(newChild2 fastEquals arg2)) {
-changed = true
-(newChild1, newChild2)
-  } else {
-tuple
-  }
-case other => other
-  }
+def mapChild(child: Any): Any = child match {
+  case arg: TreeNode[_] if applyToAll || containsChild(arg) =>
 
 Review comment:
   I'm not sure about the expressions, I think it totally depends on the usage, 
and right now we don't need to copy expressions for AQE I believe.
   On the other hand, though, there's "fake leaf nodes" that derive from 
`LeafNode` but do have children nodes not declared as `children`, e.g., 
`ReusedExchange`. Again, right now for AQE usage, we only care about the 
logical plans, so we are probably OK? (can't think of any logical plan fake 
leaf nodes so far).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-12 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r292994737
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+@transient session: SparkSession,
+@transient subqueryMap: Map[Long, ExecSubqueryExpression],
+@transient stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  @transient private val executionId = Option(
+
session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
+
+  @transient private val lock = new Object()
+
+  // The logical plan optimizer for re-optimizing the current logical plan.
+  @transient private val optimizer = new RuleExecutor[LogicalPlan] {
+// TODO add more optimization rules
+override protected def batches: Seq[Batch] = Seq()
+  }
+
+  // A list of physical plan rules to be applied before creation of query 
stages. The physical
+  // plan should reach a final status of query stages (i.e., no more addition 
or removal of
+  // Exchange nodes) after running these rules.
+  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
+PlanAdaptiveSubqueries(subqueryMap),
+EnsureRequirements(conf)
+  )
+
+  // A list of physical optimizer rules to be applied to a new stage before 
its execution. These
+  // optimizations should be stage-independent.
+  @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
+CollapseCodegenStages(conf)
+  )
+
+  private var currentStageId = 0
+
+  @volatile private var currentPhysicalPlan = initialPlan
+
+  @volatile private var isFinalPlan = false
+
+  @volatile private var fallback = false
+
+  /**
+   * Return type for `createQueryStages`
+   * @param newPlan the new plan with created query stages.
+   * @param allChildStagesMaterialized whether all child stages have been 
materialized.
+   * @param newStages the newly created query stages, including new reused 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-12 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r292967369
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 ##
 @@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.{ReusedSubqueryExec, SparkPlan}
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class AdaptiveQueryExecSuite extends QueryTest with SharedSQLContext {
+
+  setupTestData()
+
+  private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, 
SparkPlan) = {
+val dfAdaptive = sql(query)
+val result = dfAdaptive.collect()
+withSQLConf(SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "false") {
+  val df = sql(query)
+  QueryTest.sameRows(result.toSeq, df.collect().toSeq)
+}
+val plan = dfAdaptive.queryExecution.executedPlan
+assert(plan.isInstanceOf[AdaptiveSparkPlanExec])
+val adaptivePlan = plan.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
+(dfAdaptive.queryExecution.sparkPlan, adaptivePlan)
+  }
+
+  private def findTopLevelBroadcastHashJoin(plan: SparkPlan): 
Seq[BroadcastHashJoinExec] = {
+plan.collect {
+  case j: BroadcastHashJoinExec => Seq(j)
+  case s: QueryStageExec => findTopLevelBroadcastHashJoin(s.plan)
+}.flatten
+  }
+
+  private def findTopLevelSortMergeJoin(plan: SparkPlan): 
Seq[SortMergeJoinExec] = {
+plan.collect {
+  case j: SortMergeJoinExec => Seq(j)
+  case s: QueryStageExec => findTopLevelSortMergeJoin(s.plan)
+}.flatten
+  }
+
+  private def findReusedExchange(plan: SparkPlan): Seq[ReusedQueryStageExec] = 
{
+plan.collect {
+  case e: ReusedQueryStageExec => Seq(e)
+  case a: AdaptiveSparkPlanExec => findReusedExchange(a.executedPlan)
+  case s: QueryStageExec => findReusedExchange(s.plan)
+  case p: SparkPlan => p.subqueries.flatMap(findReusedExchange)
+}.flatten
+  }
+
+  private def findReusedSubquery(plan: SparkPlan): Seq[ReusedSubqueryExec] = {
+plan.collect {
+  case e: ReusedSubqueryExec => Seq(e)
+  case s: QueryStageExec => findReusedSubquery(s.plan)
+  case p: SparkPlan => p.subqueries.flatMap(findReusedSubquery)
+}.flatten
+  }
+
+  test("Change merge join to broadcast join") {
+withSQLConf(
+SQLConf.RUNTIME_REOPTIMIZATION_ENABLED.key -> "true",
+SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
+  val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+"SELECT * FROM testData join testData2 ON key = a where value = '1'")
 
 Review comment:
   The explain before execution and after execution is different. And the 
"before" version has not applied the WholeStageCodeGen rule yet.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-02 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289678641
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, BuildLeft, BuildRight}
+
+/**
+ * Strategy for plans containing [[LogicalQueryStage]] nodes:
+ * 1. Transforms [[LogicalQueryStage]] to its corresponding physical plan that 
is either being
+ *executed or has already completed execution.
+ * 2. Transforms [[Join]] which has one child relation already planned and 
executed as a
 
 Review comment:
   Added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-02 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289677903
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  @transient private val executionId = Option(
+
session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
+
+  @transient private val lock = new Object()
+
+  // A list of physical plan rules to be applied before creation of query 
stages. The physical
+  // plan should reach a final status of query stages (i.e., no more addition 
or removal of
+  // Exchange nodes) after running these rules.
+  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
+PlanAdaptiveSubqueries(subqueryMap),
+EnsureRequirements(conf)
+  )
+
+  // A list of physical optimizer rules to be applied to a new stage before 
its execution. These
+  // optimizations should be stage-independent.
+  @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
+CollapseCodegenStages(conf)
+  )
+
+  private var currentStageId = 0
+
+  @volatile private var currentPhysicalPlan =
+applyPhysicalRules(initialPlan, queryStagePreparationRules)
+
+  // The logical plan optimizer for re-optimizing the current logical plan.
+  private object Optimizer extends RuleExecutor[LogicalPlan] {
+// TODO add more optimization rules
+override protected def batches: Seq[Batch] = Seq()
+  }
+
+  /**
+   * Return type for `createQueryStages`
+   * @param newPlan the new plan with created query stages.
+   * @param allChildStagesMaterialized whether all child stages have been 
materialized.
+   * @param newStages the newly created query stages, including new reused 
query stages.
+   */
+  private case class CreateStageResult(
+newPlan: SparkPlan,
+allChildStagesMaterialized: Boolean,
+newStages: 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-06-02 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289677903
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  @transient private val executionId = Option(
+
session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
+
+  @transient private val lock = new Object()
+
+  // A list of physical plan rules to be applied before creation of query 
stages. The physical
+  // plan should reach a final status of query stages (i.e., no more addition 
or removal of
+  // Exchange nodes) after running these rules.
+  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
+PlanAdaptiveSubqueries(subqueryMap),
+EnsureRequirements(conf)
+  )
+
+  // A list of physical optimizer rules to be applied to a new stage before 
its execution. These
+  // optimizations should be stage-independent.
+  @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
+CollapseCodegenStages(conf)
+  )
+
+  private var currentStageId = 0
+
+  @volatile private var currentPhysicalPlan =
+applyPhysicalRules(initialPlan, queryStagePreparationRules)
+
+  // The logical plan optimizer for re-optimizing the current logical plan.
+  private object Optimizer extends RuleExecutor[LogicalPlan] {
+// TODO add more optimization rules
+override protected def batches: Seq[Batch] = Seq()
+  }
+
+  /**
+   * Return type for `createQueryStages`
+   * @param newPlan the new plan with created query stages.
+   * @param allChildStagesMaterialized whether all child stages have been 
materialized.
+   * @param newStages the newly created query stages, including new reused 
query stages.
+   */
+  private case class CreateStageResult(
+newPlan: SparkPlan,
+allChildStagesMaterialized: Boolean,
+newStages: 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-31 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289570771
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def conf: SQLConf = session.sessionState.conf
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = lock.synchronized {
 
 Review comment:
   Insightful. Thank you, @hvanhovell !


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-31 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289494728
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def conf: SQLConf = session.sessionState.conf
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = lock.synchronized {
+var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
+var result = createQueryStages(currentPhysicalPlan)
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+val errors = new mutable.ArrayBuffer[SparkException]()
+while (!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, 
result.newStages)
+  currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, 
currentLogicalPlan)
+  onUpdatePlan()
+
+  // Start materialization of all new stages.
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+events.offer(StageSuccess(stage, res.get))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}(AdaptiveSparkPlanExec.executionContext)
+  }
+
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = new util.ArrayList[StageMaterializationEvent]()
+  events.drainTo(rem)
+  

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-31 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289484612
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, BuildLeft, BuildRight}
+
+/**
+ * Strategy for plans containing [[LogicalQueryStage]] nodes:
+ * 1. Transforms [[LogicalQueryStage]] to its corresponding physical plan that 
is either being
+ *executed or has already completed execution.
+ * 2. Transforms [[Join]] which has one child relation already planned and 
executed as a
 
 Review comment:
   This is a safer. For hint, in theory, is not a guaranteed promise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-31 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289484612
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LogicalQueryStageStrategy.scala
 ##
 @@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, BuildLeft, BuildRight}
+
+/**
+ * Strategy for plans containing [[LogicalQueryStage]] nodes:
+ * 1. Transforms [[LogicalQueryStage]] to its corresponding physical plan that 
is either being
+ *executed or has already completed execution.
+ * 2. Transforms [[Join]] which has one child relation already planned and 
executed as a
 
 Review comment:
   This is a safer guarantee. For hint, in theory, is not a guaranteed promise.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-31 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289455441
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def conf: SQLConf = session.sessionState.conf
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = lock.synchronized {
+var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
+var result = createQueryStages(currentPhysicalPlan)
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+val errors = new mutable.ArrayBuffer[SparkException]()
+while (!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, 
result.newStages)
+  currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, 
currentLogicalPlan)
+  onUpdatePlan()
+
+  // Start materialization of all new stages.
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+events.offer(StageSuccess(stage, res.get))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}(AdaptiveSparkPlanExec.executionContext)
+  }
+
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = new util.ArrayList[StageMaterializationEvent]()
+  events.drainTo(rem)
+  

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-30 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289164923
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 ##
 @@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.concurrent.Future
+
+import org.apache.spark.{FutureAction, MapOutputStatistics}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+
+
+/**
+ * A query stage is an independent subgraph of the query plan. Query stage 
materializes its output
+ * before proceeding with further operators of the query plan. The data 
statistics of the
+ * materialized output can be used to optimize subsequent query stages.
+ *
+ * There are 2 kinds of query stages:
+ *   1. Shuffle query stage. This stage materializes its output to shuffle 
files, and Spark launches
+ *  another job to execute the further operators.
+ *   2. Broadcast query stage. This stage materializes its output to an array 
in driver JVM. Spark
+ *  broadcasts the array before executing the further operators.
+ */
+abstract class QueryStageExec extends LeafExecNode {
+
+  /**
+   * An id of this query stage which is unique in the entire query plan.
+   */
+  def id: Int
+
+  /**
+   * The sub-tree of the query plan that belongs to this query stage.
+   */
+  def plan: SparkPlan
 
 Review comment:
   We can't, for there is `ReuseQueryStageExec` :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-30 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289145896
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ##
 @@ -58,11 +58,21 @@ case class BroadcastExchangeExec(
 BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
   }
 
+  @transient
+  private lazy val promise = Promise[broadcast.Broadcast[Any]]()
+
+  /**
+   * For registering callbacks on `relationFuture`.
+   * Note that calling this field will not start the execution of broadcast 
job.
+   */
+  @transient
+  lazy val completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]] 
= promise.future
+
   @transient
   private val timeout: Long = SQLConf.get.broadcastTimeout
 
   @transient
-  private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
+  private[sql] lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
 
 Review comment:
   Yeah, just to include more info in our offline discussion here:
   
   What is needed by AQE is a "cancellable" Future with callbacks.
   
   Java Future is cancellable, but has no callbacks.
   Scala Future has callbacks, but is not cancellable.
   The Spark impl of a cancellable Scala Future (with callbacks), namely 
`FutureAction`, is more specific for Spark jobs than for general purpose, thus 
not so friendly to `BroadcastExchangeExec` here.
   
   Looks like an option can be to implement a general purpose `FutureAction` to 
replace the Java Future in `BroadcastExchangeExec`. Let's just mark this a 
follow-up though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-30 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289145896
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ##
 @@ -58,11 +58,21 @@ case class BroadcastExchangeExec(
 BroadcastExchangeExec(mode.canonicalized, child.canonicalized)
   }
 
+  @transient
+  private lazy val promise = Promise[broadcast.Broadcast[Any]]()
+
+  /**
+   * For registering callbacks on `relationFuture`.
+   * Note that calling this field will not start the execution of broadcast 
job.
+   */
+  @transient
+  lazy val completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]] 
= promise.future
+
   @transient
   private val timeout: Long = SQLConf.get.broadcastTimeout
 
   @transient
-  private lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
+  private[sql] lazy val relationFuture: Future[broadcast.Broadcast[Any]] = {
 
 Review comment:
   Yeah, just to include more info in our offline discussion here:
   
   What is needed by AQE is a "cancellable" Future with callbacks.
   
   Java Future is cancellable, but has no callbacks.
   Scala Future has callbacks, but is not cancellable.
   The Spark impl of a cancellable Scala Future (with callbacks), namely 
`FutureAction`, is more specific for Spark jobs and for general purpose, thus 
not so friendly to `BroadcastExchangeExec` here.
   
   Looks like an option can be to implement a general purpose `FutureAction` to 
replace the Java Future in `BroadcastExchangeExec`. Let's just mark this a 
follow-up though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-30 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289141774
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def conf: SQLConf = session.sessionState.conf
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = lock.synchronized {
+var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
+var result = createQueryStages(currentPhysicalPlan)
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+val errors = new mutable.ArrayBuffer[SparkException]()
+while (!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, 
result.newStages)
+  currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, 
currentLogicalPlan)
+  onUpdatePlan()
+
+  // Start materialization of all new stages.
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+events.offer(StageSuccess(stage, res.get))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}(AdaptiveSparkPlanExec.executionContext)
+  }
+
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = new util.ArrayList[StageMaterializationEvent]()
+  events.drainTo(rem)
+  

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-30 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289137381
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def conf: SQLConf = session.sessionState.conf
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = lock.synchronized {
+var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
+var result = createQueryStages(currentPhysicalPlan)
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+val errors = new mutable.ArrayBuffer[SparkException]()
+while (!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, 
result.newStages)
+  currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, 
currentLogicalPlan)
+  onUpdatePlan()
+
+  // Start materialization of all new stages.
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+events.offer(StageSuccess(stage, res.get))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}(AdaptiveSparkPlanExec.executionContext)
+  }
+
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = new util.ArrayList[StageMaterializationEvent]()
+  events.drainTo(rem)
+  

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-30 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289133742
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.ExecutionContext
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[SparkPlan, QueryStageExec])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def conf: SQLConf = session.sessionState.conf
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = lock.synchronized {
+var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
+var result = createQueryStages(currentPhysicalPlan)
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+val errors = new mutable.ArrayBuffer[SparkException]()
+while (!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, 
result.newStages)
+  currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, 
currentLogicalPlan)
+  onUpdatePlan()
+
+  // Start materialization of all new stages.
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+events.offer(StageSuccess(stage, res.get))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}(AdaptiveSparkPlanExec.executionContext)
+  }
+
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = new util.ArrayList[StageMaterializationEvent]()
+  events.drainTo(rem)
+  

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-30 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r289130671
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.{execution, SparkSession}
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ *
+ * Note that this rule is stateful and thus should not be reused across query 
executions.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[SparkPlan, QueryStageExec]()
 
 Review comment:
   Problem is we want to make all `AdaptiveSparkPlanExec`s of the main query 
and the subqueries share the same `stageCache`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-29 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288635525
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+var result = createQueryStages(currentPhysicalPlan)
+currentPhysicalPlan.synchronized {
 
 Review comment:
   My bad. Was in a hurry and didn't even think.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-29 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288635263
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+var result = createQueryStages(currentPhysicalPlan)
+currentPhysicalPlan.synchronized {
+  val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+  val errors = new mutable.ArrayBuffer[SparkException]()
 
 Review comment:
   I'd argue it'll have to be instantiated with every loop, yet used exactly 
once (when there's sth. wrong).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-29 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288633927
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+var result = createQueryStages(currentPhysicalPlan)
+currentPhysicalPlan.synchronized {
+  val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+  val errors = new mutable.ArrayBuffer[SparkException]()
+  while (!result.allChildStagesMaterialized) {
+currentPhysicalPlan = result.newPlan
+updateLogicalPlan(result.newStages)
+onUpdatePlan()
+
+// Start materialization of all new stages.
+result.newStages.map(_._2).foreach { stage =>
+  stage.materialize().onComplete { res =>
+if (res.isSuccess) {
+  events.offer(StageSuccess(stage, res.get))
+} else {
+  events.offer(StageFailure(stage, res.failed.get))
+}
+  }
+}
+
+// Wait on the next completed stage, which indicates new stats are 
available and probably
+// new stages can be created. There might be other stages that finish 
at around the same
+// time, so we process those stages too in order to reduce re-planning.
+val nextMsg = events.take()
 
 Review comment:
   That'll work too. But `nextMsg` is used in the current impl, otherwise the 
whole thing wouldn't work:
   

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288319594
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
 
 Review comment:
   > As fair as I understand this code subqueries for a given stage are now 
executed before the stage. This used to be that all the subqueries for a query 
were executed before the main query.
   
   Agreed. But this has nothing to do with what this rule does. It's just we 
don't call `execute` higher in the tree before stages below get to finish. So 
we may need to refactor the original `SparkPlan.executeQuery` logic to make 
this "wait-for-all-subqueries-to-finish" thing more explicit.
   
   > We may need to consider moving this into the AdaptiveSparkPlanExec to put 
most state in one place and make this stateless again. You could turn this into 
a mix-in if this adds too much LOC.
   
   Agreed. Not the prettiest solution to put a stateful object into a rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288319594
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
 
 Review comment:
   > As fair as I understand this code subqueries for a given stage are now 
executed before the stage. This used to be that all the subqueries for a query 
were executed before the main query.
   
   Agreed. But this has nothing to do with what this rule does. It's just we 
don't call `execute` higher in the tree before stages below get to finish. So 
we may need to refactor the original `SparkPlan.executeQuery` logic to make 
this "wait-for-all-subqueries-to-finish" thing more explicit.
   
   > We may need to consider moving this into the AdaptiveSparkPlanExec to put 
most state in one place and make this stateless again. You could turn this into 
a mix-in if this adds too much LOC.
   
   Agreed. Not the pretties solution to put a stateful object into a rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288319594
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
 
 Review comment:
   > As fair as I understand this code subqueries for a given stage are now 
executed before the stage. This used to be that all the subqueries for a query 
were executed before the main query.
   
   Agreed. But this has nothing to do with what this rule does. It's just we 
don't call `execute` higher in the tree before stages below get to finish. So 
we may need to refactor the original `SparkPlan.executeQuery` logic to make 
this "wait-for-all-subqueries-to-finish" thing more explicit.
   
   > We may need to consider moving this into the AdaptiveSparkPlanExec to put 
most state in one place and make this stateless again. You could turn this into 
a mix-in if this adds too much LOC.
   Agreed. Not the pretties solution to put a stateful object into a rule.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288309210
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
 
 Review comment:
   > This already seems to be done in AdaptiveSparkPlanExec when we submit the 
stage.
   
   No.
   
   The physical transformations we used in `QueryExecution.preparations` have 
now been split into two groups in adaptive execution here (also noted in the 
code comment):
   1. Rules that add or remove exchanges.
   2. Rules that are independent within each exchange, or say, stage.
   
   `InsertAdaptiveSparkPlan` is now the first in `QueryExecution.preparations`, 
which means neither of these two groups has been applied yet. It is this way so 
that we do not need to manipulate (modify) the rule application order in 
`QueryExecution.preparations` for AQE.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288309210
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
 
 Review comment:
   > This already seems to be done in AdaptiveSparkPlanExec when we submit the 
stage.
   No.
   
   The physical transformations we used in `QueryExecution.preparations` have 
now been split into two groups in adaptive execution here (also noted in the 
code comment):
   1. Rules that add or remove exchanges.
   2. Rules that are independent within each exchange, or say, stage.
   
   `InsertAdaptiveSparkPlan` is now the first in `QueryExecution.preparations`, 
which means neither of these two groups has been applied yet. It is this way so 
that we do not need to manipulate (modify) the rule application order in 
`QueryExecution.preparations` for AQE.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288253415
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
 
 Review comment:
   This is just part of the return result from create stages, so should not be 
expensive at all?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288252688
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  updateLogicalPlan(result.newStages)
+  onUpdatePlan()
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+stage.resultOption = Some(res.get)
+events.offer(StageSuccess(stage))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}
+  }
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = mutable.ArrayBuffer.empty[StageMaterializationEvent]
+  (Seq(nextMsg) ++ rem).foreach{ e => e match
 
 Review comment:
   Yeah. IntelliJ also pointed out :P


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288251721
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  updateLogicalPlan(result.newStages)
+  onUpdatePlan()
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+stage.resultOption = Some(res.get)
+events.offer(StageSuccess(stage))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}
+  }
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = mutable.ArrayBuffer.empty[StageMaterializationEvent]
 
 Review comment:
   Yes :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288249963
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  updateLogicalPlan(result.newStages)
+  onUpdatePlan()
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+stage.resultOption = Some(res.get)
+events.offer(StageSuccess(stage))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}
+  }
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = mutable.ArrayBuffer.empty[StageMaterializationEvent]
+  (Seq(nextMsg) ++ rem).foreach{ e => e match
+{
+  case StageSuccess(stage) =>
+completedStages += stage.id
+  case StageFailure(stage, ex) =>
+throw new SparkException(
 
 Review comment:
   Yes, it should be a TODO here. For that we'll need a cancellable Future.


[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288249607
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  updateLogicalPlan(result.newStages)
+  onUpdatePlan()
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+stage.resultOption = Some(res.get)
 
 Review comment:
   Good point. That way the stats and the stage completion status are always 
consistent for replanning.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288213424
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
+logDebug(s"Adaptive execution enabled for plan: $plan")
+AdaptiveSparkPlanExec(newPlan, session.cloneSession(), subqueryMap, 
stageCache)
+  } catch {
+case _: SubqueryAdaptiveNotSupportedException =>
+  plan
+  }
+case _ => plan
+  }
+
+  private def supportAdaptive(plan: SparkPlan): Boolean = {
+// TODO migrate dynamic-data-pruning onto adaptive execution.
+sanityCheck(plan) &&
+  !plan.logicalLink.exists(_.isStreaming) &&
+  plan.children.forall(supportAdaptive)
+  }
+
+  private def sanityCheck(plan: SparkPlan): Boolean = plan match {
+case _: SortExec | _: ShuffleExchangeExec => true
+case _ => plan.logicalLink.isDefined
+  }
+
+  private def planSubqueries(plan: SparkPlan): Map[Long, 
ExecSubqueryExpression] = {
+val subqueryMapBuilder = mutable.HashMap.empty[Long, 
ExecSubqueryExpression]
+plan.foreach(_.expressions.foreach(_.foreach {
+  case expressions.ScalarSubquery(p, _, exprId)
+  if !subqueryMapBuilder.contains(exprId.id) =>
+val executedPlan = getExecutedPlan(p)
+val scalarSubquery = ScalarSubquery(
+  SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId)
+subqueryMapBuilder.put(exprId.id, scalarSubquery)
+  case _ =>
+}))
+
+// Reuse subqueries
+if (session.sessionState.conf.subqueryReuseEnabled) {
+  // Build a hash map using schema of subqueries to avoid O(N*N) 
sameResult calls.
+  val reuseMap = mutable.HashMap[StructType, 
mutable.ArrayBuffer[BaseSubqueryExec]]()
+  subqueryMapBuilder.keySet.foreach { exprId =>
+val sub = subqueryMapBuilder.get(exprId).get
+val sameSchema =
+  reuseMap.getOrElseUpdate(sub.plan.schema, mutable.ArrayBuffer.empty)
+val sameResult = sameSchema.find(_.sameResult(sub.plan))
+if (sameResult.isDefined) {
+  val newExpr = sub.withNewPlan(ReusedSubqueryExec(sameResult.get))
+  subqueryMapBuilder.update(exprId, 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288207216
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
+logDebug(s"Adaptive execution enabled for plan: $plan")
+AdaptiveSparkPlanExec(newPlan, session.cloneSession(), subqueryMap, 
stageCache)
+  } catch {
+case _: SubqueryAdaptiveNotSupportedException =>
+  plan
+  }
+case _ => plan
+  }
+
+  private def supportAdaptive(plan: SparkPlan): Boolean = {
+// TODO migrate dynamic-data-pruning onto adaptive execution.
+sanityCheck(plan) &&
+  !plan.logicalLink.exists(_.isStreaming) &&
+  plan.children.forall(supportAdaptive)
+  }
+
+  private def sanityCheck(plan: SparkPlan): Boolean = plan match {
+case _: SortExec | _: ShuffleExchangeExec => true
 
 Review comment:
   Good catch! My initial work had had it differently and didn't come back to 
it afterwards.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288207216
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
+logDebug(s"Adaptive execution enabled for plan: $plan")
+AdaptiveSparkPlanExec(newPlan, session.cloneSession(), subqueryMap, 
stageCache)
+  } catch {
+case _: SubqueryAdaptiveNotSupportedException =>
+  plan
+  }
+case _ => plan
+  }
+
+  private def supportAdaptive(plan: SparkPlan): Boolean = {
+// TODO migrate dynamic-data-pruning onto adaptive execution.
+sanityCheck(plan) &&
+  !plan.logicalLink.exists(_.isStreaming) &&
+  plan.children.forall(supportAdaptive)
+  }
+
+  private def sanityCheck(plan: SparkPlan): Boolean = plan match {
+case _: SortExec | _: ShuffleExchangeExec => true
 
 Review comment:
   Good catch!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288206940
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
 
 Review comment:
   Good point. This rule should not be "statically created" and reused. I'll 
add this note to the javadoc of this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288205727
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
 
 Review comment:
   We need a concurrent map that supports `getOrElseUpdate`.
   
   I thought we should change this "schema-to-a-list-of-plans" map to a 
"canonicalized-plan-to-plan" map so we can use a simple concurrent map and do 
not need to put a lock on the list. If we'll fix it, we'll fix it with the 
compile-time reuse maps together.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288205727
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
 
 Review comment:
   We need a concurrent map that supports `getOrElseUpdate`.
   
   I thought we should change this "schema-to-a-list-of-plans" map to a 
"canonicalized-plan-to-plan" map so we can use a simple concurrent map and do 
not need to put a lock on the list.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288204247
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 ##
 @@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * This rule wraps the query plan with an [[AdaptiveSparkPlanExec]], which 
executes the query plan
+ * and re-optimize the plan during execution based on runtime data statistics.
+ */
+case class InsertAdaptiveSparkPlan(session: SparkSession) extends 
Rule[SparkPlan] {
+
+  private val conf = session.sessionState.conf
+
+  // Exchange-reuse is shared across the entire query, including sub-queries.
+  private val stageCache = new TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]]()
+
+  override def apply(plan: SparkPlan): SparkPlan = plan match {
+case _: ExecutedCommandExec => plan
+case _ if conf.runtimeReoptimizationEnabled
+  && supportAdaptive(plan) =>
+  try {
+// Plan sub-queries recursively and pass in the shared stage cache for 
exchange reuse. Fall
+// back to non-adaptive mode if adaptive execution is supported in any 
of the sub-queries.
+val subqueryMap = planSubqueries(plan)
+// Run preparation rules.
+val preparations = 
AdaptiveSparkPlanExec.createQueryStagePreparationRules(
+  session.sessionState.conf, subqueryMap)
+val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, 
preparations)
+logDebug(s"Adaptive execution enabled for plan: $plan")
+AdaptiveSparkPlanExec(newPlan, session.cloneSession(), subqueryMap, 
stageCache)
+  } catch {
+case _: SubqueryAdaptiveNotSupportedException =>
+  plan
+  }
+case _ => plan
+  }
+
+  private def supportAdaptive(plan: SparkPlan): Boolean = {
+// TODO migrate dynamic-data-pruning onto adaptive execution.
+sanityCheck(plan) &&
+  !plan.logicalLink.exists(_.isStreaming) &&
+  plan.children.forall(supportAdaptive)
+  }
+
+  private def sanityCheck(plan: SparkPlan): Boolean = plan match {
+case _: SortExec | _: ShuffleExchangeExec => true
+case _ => plan.logicalLink.isDefined
+  }
+
+  private def planSubqueries(plan: SparkPlan): Map[Long, 
ExecSubqueryExpression] = {
+val subqueryMapBuilder = mutable.HashMap.empty[Long, 
ExecSubqueryExpression]
+plan.foreach(_.expressions.foreach(_.foreach {
+  case expressions.ScalarSubquery(p, _, exprId)
+  if !subqueryMapBuilder.contains(exprId.id) =>
+val executedPlan = getExecutedPlan(p)
+val scalarSubquery = ScalarSubquery(
+  SubqueryExec(s"subquery${exprId.id}", executedPlan), exprId)
+subqueryMapBuilder.put(exprId.id, scalarSubquery)
+  case _ =>
+}))
+
+// Reuse subqueries
+if (session.sessionState.conf.subqueryReuseEnabled) {
+  // Build a hash map using schema of subqueries to avoid O(N*N) 
sameResult calls.
+  val reuseMap = mutable.HashMap[StructType, 
mutable.ArrayBuffer[BaseSubqueryExec]]()
+  subqueryMapBuilder.keySet.foreach { exprId =>
+val sub = subqueryMapBuilder.get(exprId).get
+val sameSchema =
+  reuseMap.getOrElseUpdate(sub.plan.schema, mutable.ArrayBuffer.empty)
+val sameResult = sameSchema.find(_.sameResult(sub.plan))
+if (sameResult.isDefined) {
+  val newExpr = sub.withNewPlan(ReusedSubqueryExec(sameResult.get))
+  subqueryMapBuilder.update(exprId, 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288202927
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  updateLogicalPlan(result.newStages)
+  onUpdatePlan()
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+stage.resultOption = Some(res.get)
+events.offer(StageSuccess(stage))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}
+  }
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = mutable.ArrayBuffer.empty[StageMaterializationEvent]
+  (Seq(nextMsg) ++ rem.toSeq).foreach{ e => e match
+{
+  case StageSuccess(stage) =>
+completedStages += stage.id
+  case StageFailure(stage, ex) =>
+throw new SparkException(
+  s"""
+ |Fail to materialize query stage ${stage.id}:
+ |${stage.plan.treeString}
+   

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288201522
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
 ##
 @@ -79,6 +81,34 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
 super.makeCopy(newArgs)
   }
 
+  /**
+   * @return The logical plan this plan is linked to.
+   */
+  def logicalLink: Option[LogicalPlan] =
+getTagValue(SparkPlan.LOGICAL_PLAN_TAG)
+  .orElse(getTagValue(SparkPlan.LOGICAL_PLAN_INHERITED_TAG))
+
+  /**
+   * Set logical plan link recursively if unset.
+   */
+  def setLogicalLink(logicalPlan: LogicalPlan): Unit = {
+setLogicalLink(logicalPlan, false)
+  }
+
+  private def setLogicalLink(logicalPlan: LogicalPlan, inherited: Boolean = 
false): Unit = {
+if (logicalLink.isDefined) {
+  return
+}
+
+val tag = if (inherited) {
+  SparkPlan.LOGICAL_PLAN_INHERITED_TAG
+} else {
+  SparkPlan.LOGICAL_PLAN_TAG
+}
+setTagValue(tag, logicalPlan)
+children.foreach(_.setLogicalLink(logicalPlan, true))
 
 Review comment:
   Yes. That's true. And you could always "force set" this logical link if need 
be.
   It's not necessary to draw a line between "logical plan" and "inherited 
logical plan" for the use of adaptive execution, so this is simply to make sure 
any future use of it can tell a top node from the rest.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288200042
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  updateLogicalPlan(result.newStages)
+  onUpdatePlan()
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+stage.resultOption = Some(res.get)
+events.offer(StageSuccess(stage))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}
+  }
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = mutable.ArrayBuffer.empty[StageMaterializationEvent]
+  (Seq(nextMsg) ++ rem.toSeq).foreach{ e => e match
+{
+  case StageSuccess(stage) =>
+completedStages += stage.id
+  case StageFailure(stage, ex) =>
+throw new SparkException(
+  s"""
+ |Fail to materialize query stage ${stage.id}:
+ |${stage.plan.treeString}
+   

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288196564
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -292,6 +292,12 @@ object SQLConf {
   .bytesConf(ByteUnit.BYTE)
   .createWithDefault(64 * 1024 * 1024)
 
+  val RUNTIME_REOPTIMIZATION_ENABLED =
+buildConf("spark.sql.runtime.reoptimization.enabled")
+  .doc("When true, enable runtime query re-optimization.")
+  .booleanConf
+  .createWithDefault(false)
+
   val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
 .doc("When true, enable adaptive query execution.")
 .booleanConf
 
 Review comment:
   Let's leave it now and see if we should use the existing config 
`spark.sql.adaptive.enabled` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288196564
 
 

 ##
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
 ##
 @@ -292,6 +292,12 @@ object SQLConf {
   .bytesConf(ByteUnit.BYTE)
   .createWithDefault(64 * 1024 * 1024)
 
+  val RUNTIME_REOPTIMIZATION_ENABLED =
+buildConf("spark.sql.runtime.reoptimization.enabled")
+  .doc("When true, enable runtime query re-optimization.")
+  .booleanConf
+  .createWithDefault(false)
+
   val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
 .doc("When true, enable adaptive query execution.")
 .booleanConf
 
 Review comment:
   Let's leave it now and see if we use the existing config 
`spark.sql.adaptive.enabled` instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288196010
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
+
+  override def doExecute(): RDD[InternalRow] = {
+val events = new LinkedBlockingQueue[StageMaterializationEvent]()
+var result = createQueryStages(currentPhysicalPlan)
+while(!result.allChildStagesMaterialized) {
+  currentPhysicalPlan = result.newPlan
+  updateLogicalPlan(result.newStages)
+  onUpdatePlan()
+  result.newStages.map(_._2).foreach { stage =>
+stage.materialize().onComplete { res =>
+  if (res.isSuccess) {
+stage.resultOption = Some(res.get)
+events.offer(StageSuccess(stage))
+  } else {
+events.offer(StageFailure(stage, res.failed.get))
+  }
+}
+  }
+  // Wait on the next completed stage, which indicates new stats are 
available and probably
+  // new stages can be created. There might be other stages that finish at 
around the same
+  // time, so we process those stages too in order to reduce re-planning.
+  val nextMsg = events.take()
+  val rem = mutable.ArrayBuffer.empty[StageMaterializationEvent]
+  (Seq(nextMsg) ++ rem.toSeq).foreach{ e => e match
 
 Review comment:
   Good catch! I missed the line which had been supposed to "drain" the event 
queue into `rem`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go 

[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288193020
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
+ * materializes its output at the end. When one stage completes, the data 
statistics of the
+ * materialized output will be used to optimize the remainder of the query.
+ *
+ * To create query stages, we traverse the query tree bottom up. When we hit 
an exchange node,
+ * and if all the child query stages of this exchange node are materialized, 
we create a new
+ * query stage for this exchange node. The new stage is then materialized 
asynchronously once it
+ * is created.
+ *
+ * When one query stage finishes materialization, the rest query is 
re-optimized and planned based
+ * on the latest statistics provided by all materialized stages. Then we 
traverse the query plan
+ * again and create more stages if possible. After all stages have been 
materialized, we execute
+ * the rest of the plan.
+ */
+case class AdaptiveSparkPlanExec(
+initialPlan: SparkPlan,
+session: SparkSession,
+subqueryMap: Map[Long, ExecSubqueryExpression],
+stageCache: TrieMap[StructType, mutable.Buffer[(Exchange, 
QueryStageExec)]])
+  extends LeafExecNode {
+
+  def executedPlan: SparkPlan = currentPhysicalPlan
+
+  override def output: Seq[Attribute] = initialPlan.output
+
+  override def doCanonicalize(): SparkPlan = initialPlan.canonicalized
 
 Review comment:
   +1 to @hvanhovell's comment. Moreover, one important use of the 
"canonicalized" plan is for adaptive sub-query re-use, in which we for sure 
want to compare the initial plans and initial plans only.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A new approach to do adaptive execution in Spark SQL

2019-05-28 Thread GitBox
maryannxue commented on a change in pull request #24706: [SPARK-23128][SQL] A 
new approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/24706#discussion_r288191330
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.adaptive
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.concurrent.TrieMap
+import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService}
+
+import org.apache.spark.SparkException
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.exchange._
+import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * A root node to execute the query plan adaptively. It splits the query plan 
into independent
+ * stages and executes them in order according to their dependencies. The 
query stage
 
 Review comment:
   Are you saying the wording is no good? Any suggestions?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org