attilapiros commented on a change in pull request #34234:
URL: https://github.com/apache/spark/pull/34234#discussion_r773385095
##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}
+
+ def compressAndDecompressSize(size: Long): Long = {
+ MapStatus.decompressSize(MapStatus.compressSize(size))
+ }
+
+ test("SPARK-36967: HighlyCompressedMapStatus should record accurately the
size " +
+ "of skewed shuffle blocks") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+ val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+ val sizes = smallBlockSizes ++: skewBlocksSizes
+ val avg = smallBlockSizes.sum / smallBlockSizes.length
+ val loc = BlockManagerId("a", "b", 10)
+ val mapTaskAttemptId = 5
+ val status = MapStatus(loc, sizes, mapTaskAttemptId)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ assert(status1.mapId == mapTaskAttemptId)
+ assert(status1.getSizeForBlock(0) == 0)
+ for (i <- 1 until smallBlockSizes.length) {
+ assert(status1.getSizeForBlock(i) === avg)
+ }
+ for (i <- 0 until skewBlocksSizes.length) {
+ assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+ compressAndDecompressSize(skewBlocksSizes(i)))
+ }
+ }
+
+ test("SPARK-36967: Limit accurated skewed block number if too many blocks
are skewed") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+ Array.tabulate[Long](500)(i => i + 3500 * 1024)
+ val emptyBlocksSize = sizes.filter(_ == 0).length
Review comment:
This computation makes the reader think is there/what is the reason
behind.
I think its right side can be simply replaced by just a literal 1.
##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}
+
+ def compressAndDecompressSize(size: Long): Long = {
+ MapStatus.decompressSize(MapStatus.compressSize(size))
+ }
+
+ test("SPARK-36967: HighlyCompressedMapStatus should record accurately the
size " +
+ "of skewed shuffle blocks") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+ val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+ val sizes = smallBlockSizes ++: skewBlocksSizes
+ val avg = smallBlockSizes.sum / smallBlockSizes.length
+ val loc = BlockManagerId("a", "b", 10)
+ val mapTaskAttemptId = 5
+ val status = MapStatus(loc, sizes, mapTaskAttemptId)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ assert(status1.mapId == mapTaskAttemptId)
+ assert(status1.getSizeForBlock(0) == 0)
+ for (i <- 1 until smallBlockSizes.length) {
+ assert(status1.getSizeForBlock(i) === avg)
+ }
+ for (i <- 0 until skewBlocksSizes.length) {
+ assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+ compressAndDecompressSize(skewBlocksSizes(i)))
+ }
+ }
+
+ test("SPARK-36967: Limit accurated skewed block number if too many blocks
are skewed") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+ Array.tabulate[Long](500)(i => i + 3500 * 1024)
+ val emptyBlocksSize = sizes.filter(_ == 0).length
+ val smallBlockSizes = sizes.slice(emptyBlocksSize, sizes.size - 100)
+ val skewBlocksSizes = sizes.slice(sizes.size - 100, sizes.size)
+ val avg = smallBlockSizes.sum / smallBlockSizes.length
+
+ val loc = BlockManagerId("a", "b", 10)
+ val mapTaskAttemptId = 5
+ val status = MapStatus(loc, sizes, mapTaskAttemptId)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ assert(status1.mapId == mapTaskAttemptId)
+ assert(status1.getSizeForBlock(0) == 0)
+ for (i <- 1 until sizes.length - 100) {
Review comment:
instead of 100 use the new val and also below
##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}
+
+ def compressAndDecompressSize(size: Long): Long = {
+ MapStatus.decompressSize(MapStatus.compressSize(size))
+ }
+
+ test("SPARK-36967: HighlyCompressedMapStatus should record accurately the
size " +
+ "of skewed shuffle blocks") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+ val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+ val sizes = smallBlockSizes ++: skewBlocksSizes
+ val avg = smallBlockSizes.sum / smallBlockSizes.length
+ val loc = BlockManagerId("a", "b", 10)
+ val mapTaskAttemptId = 5
+ val status = MapStatus(loc, sizes, mapTaskAttemptId)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ assert(status1.mapId == mapTaskAttemptId)
+ assert(status1.getSizeForBlock(0) == 0)
+ for (i <- 1 until smallBlockSizes.length) {
+ assert(status1.getSizeForBlock(i) === avg)
+ }
+ for (i <- 0 until skewBlocksSizes.length) {
+ assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+ compressAndDecompressSize(skewBlocksSizes(i)))
+ }
+ }
+
+ test("SPARK-36967: Limit accurated skewed block number if too many blocks
are skewed") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
Review comment:
I suggest to set `SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER` to a an
arbitrary number (a val) and use that val instead of the literal 100. This
makes the test more readable.
##########
File path: core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
##########
@@ -191,4 +191,66 @@ class MapStatusSuite extends SparkFunSuite {
assert(count === 3000)
}
}
+
+ def compressAndDecompressSize(size: Long): Long = {
+ MapStatus.decompressSize(MapStatus.compressSize(size))
+ }
+
+ test("SPARK-36967: HighlyCompressedMapStatus should record accurately the
size " +
+ "of skewed shuffle blocks") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val smallBlockSizes = Array.tabulate[Long](2889)(i => i)
+ val skewBlocksSizes = Array.tabulate[Long](10)(i => i + 350 * 1024)
+ val sizes = smallBlockSizes ++: skewBlocksSizes
+ val avg = smallBlockSizes.sum / smallBlockSizes.length
+ val loc = BlockManagerId("a", "b", 10)
+ val mapTaskAttemptId = 5
+ val status = MapStatus(loc, sizes, mapTaskAttemptId)
+ val status1 = compressAndDecompressMapStatus(status)
+ assert(status1.isInstanceOf[HighlyCompressedMapStatus])
+ assert(status1.location == loc)
+ assert(status1.mapId == mapTaskAttemptId)
+ assert(status1.getSizeForBlock(0) == 0)
+ for (i <- 1 until smallBlockSizes.length) {
+ assert(status1.getSizeForBlock(i) === avg)
+ }
+ for (i <- 0 until skewBlocksSizes.length) {
+ assert(status1.getSizeForBlock(smallBlockSizes.length + i) ===
+ compressAndDecompressSize(skewBlocksSizes(i)))
+ }
+ }
+
+ test("SPARK-36967: Limit accurated skewed block number if too many blocks
are skewed") {
+ val conf = new
SparkConf().set(config.SHUFFLE_ACCURATE_BLOCK_SKEWED_FACTOR.key, "5")
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val sizes: Array[Long] = Array.tabulate[Long](2500)(i => i) ++:
+ Array.tabulate[Long](500)(i => i + 3500 * 1024)
Review comment:
you can use the val introduced for setting
`SHUFFLE_MAX_ACCURATE_SKEWED_BLOCK_NUMBER` + a literal to emphasize we have
more skew blocks here then kept track.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]