viirya commented on a change in pull request #33541:
URL: https://github.com/apache/spark/pull/33541#discussion_r677944894
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -27,12 +28,20 @@ import org.apache.spark.sql.internal.SQLConf
* A rule to coalesce the shuffle partitions based on the map output
statistics, which can
* avoid many small reduce tasks that hurt performance.
*/
-case class CoalesceShufflePartitions(session: SparkSession) extends
AQEShuffleReadRule {
+// TODO: this rule should extends `AQEShuffleReadRule`. We can't do this now
because the coalesced
Review comment:
Create a JIRA for this like `TODO (SPARK-xxxxx)`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -113,29 +130,17 @@ case class AdaptiveSparkPlanExec(
CollapseCodegenStages()
)
- // The partitioning of the query output depends on the shuffle(s) in the
final stage. If the
- // original plan contains a repartition operator, we need to preserve the
specified partitioning,
- // whether or not the repartition-introduced shuffle is optimized out
because of an underlying
- // shuffle of the same partitioning. Thus, we need to exclude some
`AQEShuffleReadRule`s
- // from the final stage, depending on the presence and properties of
repartition operators.
- private def finalStageOptimizerRules: Seq[Rule[SparkPlan]] = {
- val origins = inputPlan.collect {
- case s: ShuffleExchangeLike => s.shuffleOrigin
- }
- val allRules = queryStageOptimizerRules ++ postStageCreationRules
- allRules.filter {
- case c: AQEShuffleReadRule =>
- origins.forall(c.supportedShuffleOrigins.contains)
- case _ => true
- }
- }
-
- private def optimizeQueryStage(plan: SparkPlan, rules:
Seq[Rule[SparkPlan]]): SparkPlan = {
- val optimized = rules.foldLeft(plan) { case (latestPlan, rule) =>
+ private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean):
SparkPlan = {
+ val optimized = queryStageOptimizerRules.foldLeft(plan) { case
(latestPlan, rule) =>
val applied = rule.apply(latestPlan)
val result = rule match {
- case c: AQEShuffleReadRule if c.mayAddExtraShuffles =>
- if (ValidateRequirements.validate(applied)) {
+ case _: AQEShuffleReadRule if !applied.fastEquals(latestPlan) =>
+ val distribution = if (isFinalStage) {
+ requiredDistribution.getOrElse(UnspecifiedDistribution)
+ } else {
+ UnspecifiedDistribution
+ }
+ if (ValidateRequirements.validate(applied, distribution)) {
applied
} else {
logDebug(s"Rule ${rule.ruleName} is not applied due to additional
shuffles " +
Review comment:
Is this comment still accurate? Seems it only cares about if required
distribution is met or not.
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -113,29 +130,17 @@ case class AdaptiveSparkPlanExec(
CollapseCodegenStages()
)
- // The partitioning of the query output depends on the shuffle(s) in the
final stage. If the
- // original plan contains a repartition operator, we need to preserve the
specified partitioning,
- // whether or not the repartition-introduced shuffle is optimized out
because of an underlying
- // shuffle of the same partitioning. Thus, we need to exclude some
`AQEShuffleReadRule`s
- // from the final stage, depending on the presence and properties of
repartition operators.
- private def finalStageOptimizerRules: Seq[Rule[SparkPlan]] = {
- val origins = inputPlan.collect {
- case s: ShuffleExchangeLike => s.shuffleOrigin
- }
- val allRules = queryStageOptimizerRules ++ postStageCreationRules
- allRules.filter {
- case c: AQEShuffleReadRule =>
- origins.forall(c.supportedShuffleOrigins.contains)
- case _ => true
- }
- }
-
- private def optimizeQueryStage(plan: SparkPlan, rules:
Seq[Rule[SparkPlan]]): SparkPlan = {
- val optimized = rules.foldLeft(plan) { case (latestPlan, rule) =>
+ private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean):
SparkPlan = {
+ val optimized = queryStageOptimizerRules.foldLeft(plan) { case
(latestPlan, rule) =>
val applied = rule.apply(latestPlan)
val result = rule match {
- case c: AQEShuffleReadRule if c.mayAddExtraShuffles =>
- if (ValidateRequirements.validate(applied)) {
+ case _: AQEShuffleReadRule if !applied.fastEquals(latestPlan) =>
+ val distribution = if (isFinalStage) {
+ requiredDistribution.getOrElse(UnspecifiedDistribution)
Review comment:
If `requiredDistribution` is `None` (the case like `df.repartition(a,
b).select(c)`), does the validation still work? If the user-specified
distribution is removed, the validation seem not able to detect it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]