This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a539d5  [SPARK-36430][SQL] Adaptively calculate the target size when 
coalescing shuffle partitions in AQE
9a539d5 is described below

commit 9a539d5846814f5fd5317b9d0b7eb1a41299f092
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Mon Aug 9 17:25:55 2021 +0800

    [SPARK-36430][SQL] Adaptively calculate the target size when coalescing 
shuffle partitions in AQE
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a performance regression introduced in 
https://github.com/apache/spark/pull/33172
    
    Before #33172 , the target size is adaptively calculated based on the 
default parallelism of the spark cluster. Sometimes it's very small and #33172 
sets a min partition size to fix perf issues. Sometimes the calculated size is 
reasonable, such as dozens of MBs.
    
    After #33172 , we no longer calculate the target size adaptively, and by 
default always coalesce the partitions into 1 MB. This can cause perf 
regression if the adaptively calculated size is reasonable.
    
    This PR brings back the code that adaptively calculate the target size 
based on the default parallelism of the spark cluster.
    
    ### Why are the changes needed?
    
    fix perf regression
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #33655 from cloud-fan/minor.
    
    Lead-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 21 +++++-----
 .../adaptive/CoalesceShufflePartitions.scala       | 46 +++++++++++++---------
 .../execution/adaptive/ShufflePartitionsUtil.scala | 10 ++---
 .../adaptive/AdaptiveQueryExecSuite.scala          |  3 +-
 .../apache/spark/sql/internal/SQLConfSuite.scala   |  6 ++-
 5 files changed, 46 insertions(+), 40 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f85f745..a930f63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -480,6 +480,7 @@ object SQLConf {
       .doc("(Deprecated since Spark 3.0)")
       .version("1.6.0")
       .bytesConf(ByteUnit.BYTE)
+      .checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive")
       .createWithDefaultString("64MB")
 
   val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
@@ -526,28 +527,26 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  private val MIN_PARTITION_SIZE_KEY = 
"spark.sql.adaptive.coalescePartitions.minPartitionSize"
-
   val COALESCE_PARTITIONS_PARALLELISM_FIRST =
     buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
-      .doc("When true, Spark ignores the target size specified by " +
+      .doc("When true, Spark does not respect the target size specified by " +
         s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when 
coalescing contiguous " +
-        "shuffle partitions, and only respect the minimum partition size 
specified by " +
-        s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the 
parallelism. " +
-        "This is to avoid performance regression when enabling adaptive query 
execution. " +
-        "It's recommended to set this config to false and respect the target 
size specified by " +
-        s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
+        "shuffle partitions, but adaptively calculate the target size 
according to the default " +
+        "parallelism of the Spark cluster. The calculated size is usually 
smaller than the " +
+        "configured target size. This is to maximize the parallelism and avoid 
performance " +
+        "regression when enabling adaptive query execution. It's recommended 
to set this config " +
+        "to false and respect the configured target size.")
       .version("3.2.0")
       .booleanConf
       .createWithDefault(true)
 
   val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
     buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize")
-      .doc("The minimum size of shuffle partitions after coalescing. Its value 
can be at most " +
-        s"20% of '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'. This is useful 
when the target size " +
-        "is ignored during partition coalescing, which is the default case.")
+      .doc("The minimum size of shuffle partitions after coalescing. This is 
useful when the " +
+        "adaptively calculated target size is too small during partition 
coalescing.")
       .version("3.2.0")
       .bytesConf(ByteUnit.BYTE)
+      .checkValue(_ > 0, "minPartitionSize must be positive")
       .createWithDefaultString("1MB")
 
   val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 75c53b4..5c14caa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -22,6 +22,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, 
ShuffleExchangeLike, ShuffleOrigin}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 /**
  * A rule to coalesce the shuffle partitions based on the map output 
statistics, which can
@@ -59,33 +60,40 @@ case class CoalesceShufflePartitions(session: SparkSession) 
extends AQEShuffleRe
     if (!shuffleStageInfos.forall(s => isSupported(s.shuffleStage.shuffle))) {
       plan
     } else {
-      // Ideally, this rule should simply coalesce partition w.r.t. the target 
size specified by
+      // Ideally, this rule should simply coalesce partitions w.r.t. the 
target size specified by
       // ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf 
regression in AQE, this
-      // rule by default ignores the target size (set it to 0), and only 
respect the minimum
-      // partition size specified by COALESCE_PARTITIONS_MIN_PARTITION_SIZE 
(default 1MB).
+      // rule by default tries to maximize the parallelism and set the target 
size to
+      // `total shuffle size / Spark default parallelism`. In case the `Spark 
default parallelism`
+      // is too big, this rule also respect the minimum partition size 
specified by
+      // COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
       // For history reason, this rule also need to support the config
-      // COALESCE_PARTITIONS_MIN_PARTITION_NUM: if it's set, we will respect 
both the target
-      // size and minimum partition number, no matter 
COALESCE_PARTITIONS_PARALLELISM_FIRST is true
-      // or false.
-      // TODO: remove the `minNumPartitions` parameter from
-      //       `ShufflePartitionsUtil.coalescePartitions` after we remove the 
config
-      //       COALESCE_PARTITIONS_MIN_PARTITION_NUM
-      val minPartitionNum = 
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
-      val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
-      // `minPartitionSize` can be at most 20% of `advisorySize`.
-      val minPartitionSize = math.min(
-        advisorySize / 5, 
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE))
-      val parallelismFirst = 
conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)
-      val advisoryTargetSize = if (minPartitionNum.isEmpty && 
parallelismFirst) {
-        0
+      // COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config 
in the future.
+      val minNumPartitions = 
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
+        if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
+          // We fall back to Spark default parallelism if the minimum number 
of coalesced partitions
+          // is not set, so to avoid perf regressions compared to no 
coalescing.
+          session.sparkContext.defaultParallelism
+        } else {
+          // If we don't need to maximize the parallelism, we set 
`minPartitionNum` to 1, so that
+          // the specified advisory partition size will be respected.
+          1
+        }
+      }
+      val advisoryTargetSize = 
conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+      val minPartitionSize = if (Utils.isTesting) {
+        // In the tests, we usually set the target size to a very small value 
that is even smaller
+        // than the default value of the min partition size. Here we also 
adjust the min partition
+        // size to be not larger than 20% of the target size, so that the 
tests don't need to set
+        // both configs all the time to check the coalescing behavior.
+        
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize
 / 5)
       } else {
-        advisorySize
+        conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
       }
       val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
         shuffleStageInfos.map(_.shuffleStage.mapStats),
         shuffleStageInfos.map(_.partitionSpecs),
         advisoryTargetSize = advisoryTargetSize,
-        minNumPartitions = 
conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(1),
+        minNumPartitions = minNumPartitions,
         minPartitionSize = minPartitionSize)
 
       if (newPartitionSpecs.nonEmpty) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 64f89b9..3609548 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -56,13 +56,9 @@ object ShufflePartitionsUtil extends Logging {
     // If `minNumPartitions` is very large, it is possible that we need to use 
a value less than
     // `advisoryTargetSize` as the target size of a coalesced task.
     val totalPostShuffleInputSize = 
mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum
-    // The max at here is to make sure that when we have an empty table, we 
only have a single
-    // coalesced partition.
-    // There is no particular reason that we pick 16. We just need a number to 
prevent
-    // `maxTargetSize` from being set to 0.
-    val maxTargetSize = math.max(
-      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 
16)
-    val targetSize = math.min(maxTargetSize, advisoryTargetSize)
+    val maxTargetSize = math.ceil(totalPostShuffleInputSize / 
minNumPartitions.toDouble).toLong
+    // It's meaningless to make target size smaller than minPartitionSize.
+    val targetSize = 
maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
 
     val shuffleIds = 
mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
     logInfo(s"For shuffle($shuffleIds), advisory target size: 
$advisoryTargetSize, " +
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index d38a641..4471fda 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1691,7 +1691,8 @@ class AdaptiveQueryExecSuite
 
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2258",
+      // Pick a small value so that no coalesce can happen.
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
       SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
       SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
       val df = spark.sparkContext.parallelize(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 6f2452a..5e661a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -259,8 +259,10 @@ class SQLConfSuite extends QueryTest with 
SharedSparkSession {
     spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
     assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 
1073741824)
 
-    spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
-    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1)
+    // test negative value
+    intercept[IllegalArgumentException] {
+      spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
+    }
 
     // Test overflow exception
     intercept[IllegalArgumentException] {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to