fangshil commented on a change in pull request #20303: [SPARK-23128][SQL] A new 
approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#discussion_r263260234
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 ##########
 @@ -36,107 +35,12 @@ import org.apache.spark.sql.internal.SQLConf
  * the input partition ordering requirements are met.
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
-  private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
-
-  private def targetPostShuffleInputSize: Long = 
conf.targetPostShuffleInputSize
-
-  private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
-
-  private def minNumPostShufflePartitions: Option[Int] = {
-    val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
-    if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions) 
else None
-  }
-
-  /**
-   * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive 
query execution is enabled
-   * and partitioning schemes of these [[ShuffleExchangeExec]]s support 
[[ExchangeCoordinator]].
-   */
-  private def withExchangeCoordinator(
-      children: Seq[SparkPlan],
-      requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
-    val supportsCoordinator =
-      if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
-        // Right now, ExchangeCoordinator only support HashPartitionings.
-        children.forall {
-          case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
-          case child =>
-            child.outputPartitioning match {
-              case hash: HashPartitioning => true
-              case collection: PartitioningCollection =>
-                
collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
-              case _ => false
-            }
-        }
-      } else {
-        // In this case, although we do not have Exchange operators, we may 
still need to
-        // shuffle data when we have more than one children because data 
generated by
-        // these children may not be partitioned in the same way.
-        // Please see the comment in withCoordinator for more details.
-        val supportsDistribution = requiredChildDistributions.forall { dist =>
-          dist.isInstanceOf[ClusteredDistribution] || 
dist.isInstanceOf[HashClusteredDistribution]
-        }
-        children.length > 1 && supportsDistribution
-      }
-
-    val withCoordinator =
-      if (adaptiveExecutionEnabled && supportsCoordinator) {
-        val coordinator =
-          new ExchangeCoordinator(
-            targetPostShuffleInputSize,
-            minNumPostShufflePartitions)
-        children.zip(requiredChildDistributions).map {
-          case (e: ShuffleExchangeExec, _) =>
-            // This child is an Exchange, we need to add the coordinator.
-            e.copy(coordinator = Some(coordinator))
-          case (child, distribution) =>
-            // If this child is not an Exchange, we need to add an Exchange 
for now.
-            // Ideally, we can try to avoid this Exchange. However, when we 
reach here,
-            // there are at least two children operators (because if there is 
a single child
-            // and we can avoid Exchange, supportsCoordinator will be false 
and we
-            // will not reach here.). Although we can make two children have 
the same number of
-            // post-shuffle partitions. Their numbers of pre-shuffle 
partitions may be different.
-            // For example, let's say we have the following plan
-            //         Join
-            //         /  \
-            //       Agg  Exchange
-            //       /      \
-            //    Exchange  t2
-            //      /
-            //     t1
-            // In this case, because a post-shuffle partition can include 
multiple pre-shuffle
-            // partitions, a HashPartitioning will not be strictly partitioned 
by the hashcodes
-            // after shuffle. So, even we can use the child Exchange operator 
of the Join to
-            // have a number of post-shuffle partitions that matches the 
number of partitions of
-            // Agg, we cannot say these two children are partitioned in the 
same way.
-            // Here is another case
-            //         Join
-            //         /  \
-            //       Agg1  Agg2
-            //       /      \
-            //   Exchange1  Exchange2
-            //       /       \
-            //      t1       t2
-            // In this case, two Aggs shuffle data with the same column of the 
join condition.
-            // After we use ExchangeCoordinator, these two Aggs may not be 
partitioned in the same
-            // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle 
partitions and 2
-            // post-shuffle partitions. It is possible that Agg1 fetches those 
pre-shuffle
-            // partitions by using a partitionStartIndices [0, 3]. However, 
Agg2 may fetch its
-            // pre-shuffle partitions by using another partitionStartIndices 
[0, 4].
-            // So, Agg1 and Agg2 are actually not co-partitioned.
-            //
-            // It will be great to introduce a new Partitioning to represent 
the post-shuffle
-            // partitions when one post-shuffle partition includes multiple 
pre-shuffle partitions.
-            val targetPartitioning = 
distribution.createPartitioning(defaultNumPreShufflePartitions)
-            assert(targetPartitioning.isInstanceOf[HashPartitioning])
-            ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
-        }
-      } else {
-        // If we do not need ExchangeCoordinator, the original children are 
returned.
-        children
-      }
-
-    withCoordinator
-  }
+  private def defaultNumPreShufflePartitions: Int =
 
 Review comment:
   @carsonwang, I am afraid we are making a risky assumption that user needs to 
be familiar with AE config. it is less risky when AE is an on-demand feature. 
however, I find the current version of this PR is setting 
spark.sql.adaptive.enabled = true, which means we plan to enable AE mode by 
default, then if we roll out next version of Spark in our cluster we are going 
to break a lot of prod jobs with custom spark.sql.shuffle.partitions.  I 
proposed a small change to adjust maxNumPostShufflePartitions based on 
spark.sql.shuffle.partitions which I think is safer, if the plan is to set AE 
as cluster default in the future

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to