fangshil commented on a change in pull request #20303: [SPARK-23128][SQL] A new
approach to do adaptive execution in Spark SQL
URL: https://github.com/apache/spark/pull/20303#discussion_r263260234
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -36,107 +35,12 @@ import org.apache.spark.sql.internal.SQLConf
* the input partition ordering requirements are met.
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
- private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions
-
- private def targetPostShuffleInputSize: Long =
conf.targetPostShuffleInputSize
-
- private def adaptiveExecutionEnabled: Boolean = conf.adaptiveExecutionEnabled
-
- private def minNumPostShufflePartitions: Option[Int] = {
- val minNumPostShufflePartitions = conf.minNumPostShufflePartitions
- if (minNumPostShufflePartitions > 0) Some(minNumPostShufflePartitions)
else None
- }
-
- /**
- * Adds [[ExchangeCoordinator]] to [[ShuffleExchangeExec]]s if adaptive
query execution is enabled
- * and partitioning schemes of these [[ShuffleExchangeExec]]s support
[[ExchangeCoordinator]].
- */
- private def withExchangeCoordinator(
- children: Seq[SparkPlan],
- requiredChildDistributions: Seq[Distribution]): Seq[SparkPlan] = {
- val supportsCoordinator =
- if (children.exists(_.isInstanceOf[ShuffleExchangeExec])) {
- // Right now, ExchangeCoordinator only support HashPartitionings.
- children.forall {
- case e @ ShuffleExchangeExec(hash: HashPartitioning, _, _) => true
- case child =>
- child.outputPartitioning match {
- case hash: HashPartitioning => true
- case collection: PartitioningCollection =>
-
collection.partitionings.forall(_.isInstanceOf[HashPartitioning])
- case _ => false
- }
- }
- } else {
- // In this case, although we do not have Exchange operators, we may
still need to
- // shuffle data when we have more than one children because data
generated by
- // these children may not be partitioned in the same way.
- // Please see the comment in withCoordinator for more details.
- val supportsDistribution = requiredChildDistributions.forall { dist =>
- dist.isInstanceOf[ClusteredDistribution] ||
dist.isInstanceOf[HashClusteredDistribution]
- }
- children.length > 1 && supportsDistribution
- }
-
- val withCoordinator =
- if (adaptiveExecutionEnabled && supportsCoordinator) {
- val coordinator =
- new ExchangeCoordinator(
- targetPostShuffleInputSize,
- minNumPostShufflePartitions)
- children.zip(requiredChildDistributions).map {
- case (e: ShuffleExchangeExec, _) =>
- // This child is an Exchange, we need to add the coordinator.
- e.copy(coordinator = Some(coordinator))
- case (child, distribution) =>
- // If this child is not an Exchange, we need to add an Exchange
for now.
- // Ideally, we can try to avoid this Exchange. However, when we
reach here,
- // there are at least two children operators (because if there is
a single child
- // and we can avoid Exchange, supportsCoordinator will be false
and we
- // will not reach here.). Although we can make two children have
the same number of
- // post-shuffle partitions. Their numbers of pre-shuffle
partitions may be different.
- // For example, let's say we have the following plan
- // Join
- // / \
- // Agg Exchange
- // / \
- // Exchange t2
- // /
- // t1
- // In this case, because a post-shuffle partition can include
multiple pre-shuffle
- // partitions, a HashPartitioning will not be strictly partitioned
by the hashcodes
- // after shuffle. So, even we can use the child Exchange operator
of the Join to
- // have a number of post-shuffle partitions that matches the
number of partitions of
- // Agg, we cannot say these two children are partitioned in the
same way.
- // Here is another case
- // Join
- // / \
- // Agg1 Agg2
- // / \
- // Exchange1 Exchange2
- // / \
- // t1 t2
- // In this case, two Aggs shuffle data with the same column of the
join condition.
- // After we use ExchangeCoordinator, these two Aggs may not be
partitioned in the same
- // way. Let's say that Agg1 and Agg2 both have 5 pre-shuffle
partitions and 2
- // post-shuffle partitions. It is possible that Agg1 fetches those
pre-shuffle
- // partitions by using a partitionStartIndices [0, 3]. However,
Agg2 may fetch its
- // pre-shuffle partitions by using another partitionStartIndices
[0, 4].
- // So, Agg1 and Agg2 are actually not co-partitioned.
- //
- // It will be great to introduce a new Partitioning to represent
the post-shuffle
- // partitions when one post-shuffle partition includes multiple
pre-shuffle partitions.
- val targetPartitioning =
distribution.createPartitioning(defaultNumPreShufflePartitions)
- assert(targetPartitioning.isInstanceOf[HashPartitioning])
- ShuffleExchangeExec(targetPartitioning, child, Some(coordinator))
- }
- } else {
- // If we do not need ExchangeCoordinator, the original children are
returned.
- children
- }
-
- withCoordinator
- }
+ private def defaultNumPreShufflePartitions: Int =
Review comment:
@carsonwang, I am afraid we are making a risky assumption that user needs to
be familiar with AE config. it is less risky when AE is an on-demand feature.
however, I find the current version of this PR is setting
spark.sql.adaptive.enabled = true, which means we plan to enable AE mode by
default, then if we roll out next version of Spark in our cluster we are going
to break a lot of prod jobs with custom spark.sql.shuffle.partitions. I
proposed a small change to adjust maxNumPostShufflePartitions based on
spark.sql.shuffle.partitions which I think is safer, if the plan is to set AE
as cluster default in the future
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]