maropu commented on a change in pull request #32084:
URL: https://github.com/apache/spark/pull/32084#discussion_r609693645
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -93,6 +106,15 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends CustomShuffl
}
}
+ private def shouldApplyChildren(plan: SparkPlan): Boolean = {
+ plan.find(p => shouldApplyChildrenFunc(p)).isDefined
+ }
+
+ private def shouldApplyChildrenFunc(plan: SparkPlan): Boolean = plan match {
+ case _: UnionExec => true
Review comment:
Any other plan node that we can apply the same optimization into? If no,
could you inline it?
```
private def shouldApplyChildren(plan: SparkPlan): Boolean = {
plan.find(_.isInstanceOf[Union]).isDefined
}
```
Then, `shouldApplyChildren` -> `hasUnion`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -35,6 +35,19 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends CustomShuffl
if (!conf.coalesceShufflePartitionsEnabled) {
return plan
}
+
+ if (shouldApplyChildren(plan)) {
+ plan.transformUp {
+ case p if shouldApplyChildrenFunc(p) &&
+ !p.children.exists(child => shouldApplyChildren(child)) =>
+ p.withNewChildren(p.children.map(child => applyInternal(child)))
+ }
+ } else {
+ applyInternal(plan)
+ }
+ }
+
+ private def applyInternal(plan: SparkPlan): SparkPlan = {
Review comment:
nit: `applyInternal` -> `coalescePartitions `?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
##########
@@ -35,6 +35,19 @@ case class CoalesceShufflePartitions(session: SparkSession)
extends CustomShuffl
if (!conf.coalesceShufflePartitionsEnabled) {
return plan
}
+
+ if (shouldApplyChildren(plan)) {
+ plan.transformUp {
+ case p if shouldApplyChildrenFunc(p) &&
+ !p.children.exists(child => shouldApplyChildren(child)) =>
+ p.withNewChildren(p.children.map(child => applyInternal(child)))
+ }
+ } else {
+ applyInternal(plan)
+ }
Review comment:
This section looks hard-to-read, so could we write it like this?
```
private def canCoalescePartitions(plan: SparkPlan): Boolean = {
plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec]) &&
!plan.find(_.isInstanceOf[CustomShuffleReaderExec]).isDefined
}
...
if (canCoalescePartitions(plan) {
// simple case
return applyInternal(plan)
}
// Handle more cases to coalesce partitions?
...
```
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1575,4 +1575,26 @@ class AdaptiveQueryExecSuite
checkNoCoalescePartitions(df.sort($"key"), ENSURE_REQUIREMENTS)
}
}
+
+ test("SPARK-34980: Support coalesce partition through union") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+ SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
+ SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "1048576",
+ SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
+ SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+ val df1 = spark.sparkContext.parallelize(
+ (1 to 10).map(i => TestData(i, i.toString)), 2).toDF()
+ val df2 = spark.sparkContext.parallelize(
+ (1 to 10).map(i => TestData(i, i.toString)), 4).toDF()
+
+ val df = df1.groupBy("key").count().unionAll(df2)
Review comment:
Could you add more tests for more cases, e.g., multiple unions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]