fangshil commented on a change in pull request #20303: [SPARK-23128][SQL] A new 
approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#discussion_r264023809
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ##########
 @@ -36,107 +35,12 @@ import org.apache.spark.sql.internal.SQLConf
  * the input partition ordering requirements are met.
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
-  private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
-
-  private def targetPostShuffleInputSize: Long = 
conf.targetPostShuffleInputSize
-
-  private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
-
-  private def minNumPostShufflePartitions: Option[Int] = {
-    val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
-    if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) 
else None
-  }
-
-  /**
-   * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive 
query execution is enabled
-   * and partitioning schemes of these [[ShuffleExchangeExec]]s support 
[[ExchangeCoordinator]].
-   */
-  private def withExchangeCoordinator(
-      children: Seq[SparkPlan],
-      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
-    val supportsCoordinator =
-      if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
-        // Right now, ExchangeCoordinator only support HashPartitionings.
-        children.forall {
-          case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
-          case child =>
-            child.outputPartitioning match {
-              case hash: HashPartitioning => true
-              case collection: PartitioningCollection =>
-                
collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
-              case _ => false
-            }
-        }
-      } else {
-        // In this case, although we do not have Exchange operators, we may 
still need to
-        // shuffle data when we have more than one children because data 
generated by
-        // these children may not be partitioned in the same way.
-        // Please see the comment in withCoordinator for more details.
-        val supportsDistribution = requiredChildDistributions.forall { dist =>
-          dist.isInstanceOf[ClusteredDistribution] || 
dist.isInstanceOf[HashClusteredDistribution]
-        }
-        children.length > 1 && supportsDistribution
-      }
-
-    val withCoordinator =
-      if (adaptiveExecutionEnabled && supportsCoordinator) {
-        val coordinator =
-          new ExchangeCoordinator(
-            targetPostShuffleInputSize,
-            minNumPostShufflePartitions)
-        children.zip(requiredChildDistributions).map {
-          case (e: ShuffleExchangeExec, _) =>
-            // This child is an Exchange, we need to add the coordinator.
-            e.copy(coordinator = Some(coordinator))
-          case (child, distribution) =>
-            // If this child is not an Exchange, we need to add an Exchange 
for now.
-            // Ideally, we can try to avoid this Exchange. However, when we 
reach here,
-            // there are at least two children operators (because if there is 
a single child
-            // and we can avoid Exchange, supportsCoordinator will be false 
and we
-            // will not reach here.). Although we can make two children have 
the same number of
-            // post-shuffle partitions. Their numbers of pre-shuffle 
partitions may be different.
-            // For example, let's say we have the following plan
-            //         Join
-            //         /  \
-            //       Agg  Exchange
-            //       /      \
-            //    Exchange  t2
-            //      /
-            //     t1
-            // In this case, because a post-shuffle partition can include 
multiple pre-shuffle
-            // partitions, a HashPartitioning will not be strictly partitioned 
by the hashcodes
-            // after shuffle. So, even we can use the child Exchange operator 
of the Join to
-            // have a number of post-shuffle partitions that matches the 
number of partitions of
-            // Agg, we cannot say these two children are partitioned in the 
same way.
-            // Here is another case
-            //         Join
-            //         /  \
-            //       Agg1  Agg2
-            //       /      \
-            //   Exchange1  Exchange2
-            //       /       \
-            //      t1       t2
-            // In this case, two Aggs shuffle data with the same column of the 
join condition.
-            // After we use ExchangeCoordinator, these two Aggs may not be 
partitioned in the same
-            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle 
partitions and 2
-            // post-shuffle partitions. It is possible that Agg1 fetches those 
pre-shuffle
-            // partitions by using a partitionStartIndices [0, 3]. However, 
Agg2 may fetch its
-            // pre-shuffle partitions by using another partitionStartIndices 
[0, 4].
-            // So, Agg1 and Agg2 are actually not co-partitioned.
-            //
-            // It will be great to introduce a new Partitioning to represent 
the post-shuffle
-            // partitions when one post-shuffle partition includes multiple 
pre-shuffle partitions.
-            val targetPartitioning = 
distribution.createPartitioning(defaultNumPreShufflePartitions)
-            assert(targetPartitioning.isInstanceOf[HashPartitioning])
-            ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
-        }
-      } else {
-        // If we do not need ExchangeCoordinator, the original children are 
returned.
-        children
-      }
-
-    withCoordinator
-  }
+  private def defaultNumPreShufflePartitions: Int =
 
 Review comment:
   Thanks @xuanyuanking for the input! Instead of setting 
maxNumPostShufflePartitions based on a magic number 1.5X or 500, I would 
propose to add a conf to replace maxNumPostShufflePartitions, which is a ratio 
of spark.sql.shuffle.partitions. The default value could be 1.0 so the behavior 
of AE's initial partition number is consistent with user-specified 
spark.sql.shuffle.partitions. With 1.5 or 2, one of my concern is it could 
potentially bring an increase in shuffle service load when we enable AE as 
cluster default, as we have seen shuffle service scalability issues in our 
cluster when handling very large shuffle workloads. 
   cc @carsonwang  @cloud-fan 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to