attilapiros commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r739396666
##########
File path: core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
##########
@@ -255,9 +255,35 @@ private[spark] object HighlyCompressedMapStatus {
// we expect that there will be far fewer of them, so we will perform
fewer bitmap insertions.
val emptyBlocks = new RoaringBitmap()
val totalNumBlocks = uncompressedSizes.length
- val threshold = Option(SparkEnv.get)
- .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
- .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+ val accurateBlockSkewedFactor = Option(SparkEnv.get)
+ .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR))
+ .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.defaultValue.get)
+ val shuffleAccurateBlockThreshold =
+ Option(SparkEnv.get)
+ .map(_.conf.get(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD))
+ .getOrElse(config.SHUFFLE_ACCURATE_BLOCK_THRESHOLD.defaultValue.get)
+ val threshold =
+ if (accurateBlockSkewedFactor > 0) {
+ val sortedSizes = uncompressedSizes.sorted
+ val medianSize: Long = Utils.median(sortedSizes)
Review comment:
Nit: `: Long` is not needed but as `medianSize` is only used once I
would remove this val and use the expression at the calculation.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1178,6 +1178,27 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)
+ private[spark] val SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR =
+ ConfigBuilder("spark.shuffle.accurateBlockSkewedFactor")
+ .doc("A shuffle block is considered as skewed and will be accurately
recorded in " +
+ "HighlyCompressedMapStatus if its size is larger than this factor
multiplying " +
+ "the median shuffle block size or SHUFFLE_ACCURATE_BLOCK_THRESHOLD. It
is " +
+ "recommended to set this parameter to be the same as
SKEW_JOIN_SKEWED_PARTITION_FACTOR." +
+ "Set to -1.0 to disable this feature by default.")
+ .version("3.3.0")
+ .doubleConf
+ .createWithDefault(-1.0)
+
+ private[spark] val SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER =
+ ConfigBuilder("spark.shuffle.maxAccurateSkewedBlockNumber")
+ .doc("Max skewed shuffle blocks allowed to be accurately recorded in " +
+ "HighlyCompressedMapStatus if its size is larger than this factor
multiplying " +
Review comment:
this factor => SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR
##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,61 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}
+
+ def compressAndDecompressSize(size: Long): Long = {
+ MapStatus.decompressSize(MapStatus.compressSize(size))
+ }
+
+ test("SPARK-36967: HighlyCompressedMapStatus should record accurately the
size " +
+ "of skewed shuffle blocks") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val sizes = Array.tabulate[Long](3000)(i => (if (i < 2990) i else i + 350
* 1024).toLong)
+ val avg = sizes.filter(_ < 3000).sum / sizes.count(i => i > 0 && i < 3000)
+ val loc = BlockManagerId("a", "b", 10)
+ val mapTaskAttemptId = 5
+ val status = MapStatus(loc, sizes, mapTaskAttemptId)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ assert(status1.mapId == mapTaskAttemptId)
+ assert(status1.getSizeForBlock(0) == 0)
+ for (i <- 1 until 3000) {
+ val estimate = status1.getSizeForBlock(i)
+ if (i < 2990) {
+ assert(estimate === avg)
+ } else {
+ assert(estimate === compressAndDecompressSize(sizes(i)))
+ }
Review comment:
Nit: I think it is easier to follow this way:
```scala
val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
val sizes = smallBlockSizes ++: skewBlocksSizes
val avg = smallBlockSizes.sum / smallBlockSizes.length
val loc = BlockManagerId("a", "b", 10)
val mapTaskAttemptId = 5
val status = MapStatus(loc, sizes, mapTaskAttemptId)
val status1 = compressAndDecompressMapStatus(status)
assert(status1.isInstanceOf[HighlyCompressedMapStatus])
assert(status1.location == loc)
assert(status1.mapId == mapTaskAttemptId)
assert(status1.getSizeForBlock(0) == 0)
for (i <- 1 until smallBlockSizes.length) {
assert(status1.getSizeForBlock(i) === avg)
}
for (i <- 0 until skewBlocksSizes.length) {
assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
compressAndDecompressSize(skewBlocksSizes(i)))
}
```
##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,61 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}
+
+ def compressAndDecompressSize(size: Long): Long = {
+ MapStatus.decompressSize(MapStatus.compressSize(size))
+ }
+
+ test("SPARK-36967: HighlyCompressedMapStatus should record accurately the
size " +
+ "of skewed shuffle blocks") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val sizes = Array.tabulate[Long](3000)(i => (if (i < 2990) i else i + 350
* 1024).toLong)
+ val avg = sizes.filter(_ < 3000).sum / sizes.count(i => i > 0 && i < 3000)
+ val loc = BlockManagerId("a", "b", 10)
+ val mapTaskAttemptId = 5
+ val status = MapStatus(loc, sizes, mapTaskAttemptId)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ assert(status1.mapId == mapTaskAttemptId)
+ assert(status1.getSizeForBlock(0) == 0)
+ for (i <- 1 until 3000) {
+ val estimate = status1.getSizeForBlock(i)
+ if (i < 2990) {
+ assert(estimate === avg)
+ } else {
+ assert(estimate === compressAndDecompressSize(sizes(i)))
+ }
+ }
+ }
+
+ test("SPARK-36967: Limit accurated skewed block number if too many blocks
are skewed") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val sizes = Array.tabulate[Long](3000)(i => (if (i < 2500) i else i + 3500
* 1024).toLong)
Review comment:
Nit: Here I would also construct the `sizes` from the concatenation of
`smallBlockSizes` and `skewedBlockSizes`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]