ulysses-you commented on a change in pull request #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r692656713
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -100,24 +100,34 @@ case class AdaptiveSparkPlanExec(
// A list of physical plan rules to be applied before creation of query
stages. The physical
// plan should reach a final status of query stages (i.e., no more addition
or removal of
// Exchange nodes) after running these rules.
- @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] =
Seq(
- RemoveRedundantProjects,
- // For cases like `df.repartition(a, b).select(c)`, there is no
distribution requirement for
- // the final plan, but we do need to respect the user-specified
repartition. Here we ask
- // `EnsureRequirements` to not optimize out the user-specified
repartition-by-col to work
- // around this case.
- EnsureRequirements(optimizeOutRepartition =
requiredDistribution.isDefined),
- RemoveRedundantSorts,
- DisableUnnecessaryBucketedScan
- ) ++ context.session.sessionState.queryStagePrepRules
+ private def queryStagePreparationRules(
+ optimizeSkewedJoin: Boolean = false): Seq[Rule[SparkPlan]] = {
+ val optimizeSkewedJoinRules = if (optimizeSkewedJoin) {
+ Seq(OptimizeSkewedJoin,
+ // Add the EnsureRequirements rule here since OptimizeSkewedJoin will
change
+ // output partitioning, make sure we have right distribution.
+ EnsureRequirements(optimizeOutRepartition =
requiredDistribution.isDefined))
+ } else {
+ Nil
+ }
+
+ Seq(
+ RemoveRedundantProjects,
+ // For cases like `df.repartition(a, b).select(c)`, there is no
distribution requirement for
+ // the final plan, but we do need to respect the user-specified
repartition. Here we ask
+ // `EnsureRequirements` to not optimize out the user-specified
repartition-by-col to work
+ // around this case.
+ EnsureRequirements(optimizeOutRepartition =
requiredDistribution.isDefined),
+ RemoveRedundantSorts,
+ DisableUnnecessaryBucketedScan
+ ) ++ optimizeSkewedJoinRules ++
context.session.sessionState.queryStagePrepRules
Review comment:
We need two `EnsureRequirements` for `OptimizeSkewedJoin`.
The first `EnsureRequirements` is to add enough exchange for Join that
ensure the physical plan can be executed. And the second `EnsureRequirements`
is to make sure we have right distribution after `OptimizeSkewedJoin ` applied
since `OptimizeSkewedJoin` can change the output partitioning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]