mridulm commented on a change in pull request #32733:
URL: https://github.com/apache/spark/pull/32733#discussion_r643627409



##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -258,18 +258,27 @@ private[spark] object HighlyCompressedMapStatus {
     val threshold = Option(SparkEnv.get)
       .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
       .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+    val minThreshold = Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_ACCURATE_SKEWED_BLOCK_THRESHOLD))
+      
.getOrElse(config.SHUFFLE_ACCURATE_SKEWED_BLOCK_THRESHOLD.defaultValue.get)
     val hugeBlockSizes = mutable.Map.empty[Int, Byte]
+    val nonEmptyUncompressedSizes = uncompressedSizes.filter(_ > 0)
+    val overallNonEmptyAvgSize = if (nonEmptyUncompressedSizes.nonEmpty) {
+      nonEmptyUncompressedSizes.sum / nonEmptyUncompressedSizes.length
+    } else {
+      0
+    }
     while (i < totalNumBlocks) {
       val size = uncompressedSizes(i)
       if (size > 0) {
         numNonEmptyBlocks += 1
         // Huge blocks are not included in the calculation for average size, 
thus size for smaller
         // blocks is more accurate.
-        if (size < threshold) {
+        if ((size >= 5 * overallNonEmptyAvgSize && size >= minThreshold) || 
size >= threshold) {

Review comment:
       Echo'ing @dongjoon-hyun's comment above - what is the background of this 
change ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to