exmy commented on a change in pull request #32733:
URL: https://github.com/apache/spark/pull/32733#discussion_r643771292
##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -258,18 +258,27 @@ private[spark] object HighlyCompressedMapStatus {
val threshold = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
.getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+ val minThreshold = Option(SparkEnv.get)
+ .map(_.conf.get(config.SHUFFLE_ACCURATE_SKEWED_BLOCK_THRESHOLD))
+
.getOrElse(config.SHUFFLE_ACCURATE_SKEWED_BLOCK_THRESHOLD.defaultValue.get)
val hugeBlockSizes = mutable.Map.empty[Int, Byte]
+ val nonEmptyUncompressedSizes = uncompressedSizes.filter(_ > 0)
+ val overallNonEmptyAvgSize = if (nonEmptyUncompressedSizes.nonEmpty) {
+ nonEmptyUncompressedSizes.sum / nonEmptyUncompressedSizes.length
+ } else {
+ 0
+ }
while (i < totalNumBlocks) {
val size = uncompressedSizes(i)
if (size > 0) {
numNonEmptyBlocks += 1
// Huge blocks are not included in the calculation for average size,
thus size for smaller
// blocks is more accurate.
- if (size < threshold) {
+ if ((size >= 5 * overallNonEmptyAvgSize && size >= minThreshold) ||
size >= threshold) {
Review comment:
We first compute a average size for non-empty uncompressedSizes, if a
block is > N * this avg size, it is marked as a huge block. In order to avoid
mistaken mark of a block which is > N * this avg size but not big enough, the
new config `spark.shuffle.accurateSkewedBlockThreshold` is introduced. Only a
block is both > N * this avg size and `accurateSkewedBlockThreshold` can be
marked as a huge block.
The reason why `accurateSkewedBlockThreshold` is set default to 350K is
because we assume 3k paratitions, only when the amount of data fetched by a
reduce task greater than 3000 * 350K = 1G can this situation be considered.
I'm not sure if N = 5 and `accurateSkewedBlockThreshold`=350K here are
appropriate and really hope to get your opinion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]