mridulm commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r735245433



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,37 @@ private[spark] object HighlyCompressedMapStatus {
     // we expect that there will be far fewer of them, so we will perform 
fewer bitmap insertions.
     val emptyBlocks = new RoaringBitmap()
     val totalNumBlocks = uncompressedSizes.length
-    val threshold = Option(SparkEnv.get)
-      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
-      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val accurateBlockSkewedFactor = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+      .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+    val threshold =
+      if (accurateBlockSkewedFactor > 0) {
+        val sortedSizes = uncompressedSizes.sorted
+        val medianSize: Long = Utils.median(sortedSizes)
+        val maxAccurateSkewedBlockNumber =
+          Math.min(
+            Option(SparkEnv.get)
+              .map(_.conf.get(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER))
+              
.getOrElse(config.SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER.defaultValue.get),
+            totalNumBlocks
+          )
+        val skewSizeThreshold =
+          Math.max(
+            medianSize * accurateBlockSkewedFactor,
+            sortedSizes(totalNumBlocks - maxAccurateSkewedBlockNumber)
+          )
+        Math.min(
+          Option(SparkEnv.get)
+            .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+            
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get),
+          skewSizeThreshold)
+      } else {
+        // Disable skew detection if accurateBlockSkewedFactor <= 0
+        Option(SparkEnv.get)
+          .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+          .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)

Review comment:
       nit: Avoid this duplication and pull this value out of the if/else ?

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,27 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createWithDefault(100 * 1024 * 1024)
 
+  private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+    ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+      .doc("A shuffle block is considered as skewed and will be accurately 
recorded in " +
+        "HighlyCompressedMapStatus if its size is larger than this factor 
multiplying " +
+        "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It 
is " +
+        "recommended to set this parameter to be the same as 
SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+        "-1 to disable this feature by default.")
+      .version("3.3.0")
+      .intConf

Review comment:
       Do we want to make this a `doubleConf` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to