Ngone51 commented on a change in pull request #31493:
URL: https://github.com/apache/spark/pull/31493#discussion_r571436070
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+ .doc("Maximum disk space to use to store shuffle blocks before rejecting
remote " +
+ "shuffle blocks. Rejecting remote shuffle blocks means that an exec
will not receive any " +
Review comment:
"an exec" -> "an executor"?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -72,6 +73,15 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ private def getShuffleBytesStored(): Long = {
+ val shuffleFiles: Seq[File] = getStoredShuffles().map {
+ si => getDataFile(si.shuffleId, si.mapId)
+ }
+ shuffleFiles.foldLeft[Long](0: Long) { (acc: Long, f: File) =>
+ acc + f.length()
+ }
Review comment:
```suggestion
shuffleFiles.map(_.length()).sum
```
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+ .doc("Maximum disk space to use to store shuffle blocks before rejecting
remote " +
+ "shuffle blocks. Rejecting remote shuffle blocks means that an exec
will not receive any " +
+ " shuffle migrations, and if there are no execs avaialble for
migration then " +
Review comment:
"no execs" -> "no executors"?
##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1932,22 +1932,29 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
}
- test("test migration of shuffle blocks during decommissioning") {
- val shuffleManager1 = makeSortShuffleManager()
+ private def testShuffleBlockDecommissioning(maxShuffle: Option[Int],
migrate: Boolean) = {
+ maxShuffle.foreach{ b =>
+ conf.set(STORAGE_REMOTE_SHUFFLE_MAX_DISK.key, s"${b}b")
Review comment:
Probably rename `maxShuffle` to `maxShuffleDiskSize` and rename `b` to
`diskSize`
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+ .doc("Maximum disk space to use to store shuffle blocks before rejecting
remote " +
+ "shuffle blocks. Rejecting remote shuffle blocks means that an exec
will not receive any " +
+ " shuffle migrations, and if there are no execs avaialble for
migration then " +
+ s"decommissioning will block unless
${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} " +
Review comment:
Will block? I think decommission will stop soon rather than block in
that case.
##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1961,20 +1968,37 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
decomManager.refreshOffloadingShuffleBlocks()
- eventually(timeout(1.second), interval(10.milliseconds)) {
- assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location ===
bm2.blockManagerId)
+ if (migrate) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
+ assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location
=== bm2.blockManagerId)
+ }
+
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
+ === shuffleDataBlockContent)
+
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
+ === shuffleIndexBlockContent)
+ } else {
+ Thread.sleep(1000)
+ assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location ===
bm1.blockManagerId)
}
-
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
- === shuffleDataBlockContent)
-
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
- === shuffleIndexBlockContent)
} finally {
mapOutputTracker.unregisterShuffle(0)
// Avoid thread leak
decomManager.stopOffloadingShuffleBlocks()
}
}
+ test("test migration of shuffle blocks during decommissioning - no limit") {
+ testShuffleBlockDecommissioning(None, true)
+ }
+
+ test("test migration of shuffle blocks during decommissioning - larger
limit") {
+ testShuffleBlockDecommissioning(Some(10000), true)
+ }
+
+ test("test migration of shuffle blocks during decommissioning - small
limit") {
Review comment:
attach the JIRA ID?
##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1932,22 +1932,29 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
}
- test("test migration of shuffle blocks during decommissioning") {
- val shuffleManager1 = makeSortShuffleManager()
+ private def testShuffleBlockDecommissioning(maxShuffle: Option[Int],
migrate: Boolean) = {
Review comment:
Maybe rename "migrate" to "willReject"?
All cases actually will try to migrate the blocks first. The difference is
whether it'll be rejected or not.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+ .doc("Maximum disk space to use to store shuffle blocks before rejecting
remote " +
+ "shuffle blocks. Rejecting remote shuffle blocks means that an exec
will not receive any " +
+ " shuffle migrations, and if there are no execs avaialble for
migration then " +
Review comment:
redundant whitespace at the prefix
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
Review comment:
"*. maxDisk" -> "*.maxDiskSize"?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -173,6 +183,13 @@ private[spark] class IndexShuffleBlockResolver(
*/
override def putShuffleBlockAsStream(blockId: BlockId, serializerManager:
SerializerManager):
StreamCallbackWithID = {
+ // Throw an exception if we have exceeded maximum shuffle files stored
+ remoteShuffleMaxDisk.foreach { maxBytes =>
+ val bytesUsed = getShuffleBytesStored()
+ if (maxBytes < bytesUsed) {
+ throw new SparkException(s"Not storing remote shuffles $bytesUsed
exceeds $maxBytes")
Review comment:
Give a hint of the conf for users to increase the maxBytes?
##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1961,20 +1968,37 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with BeforeAndAfterE
decomManager.refreshOffloadingShuffleBlocks()
- eventually(timeout(1.second), interval(10.milliseconds)) {
- assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location ===
bm2.blockManagerId)
+ if (migrate) {
+ eventually(timeout(1.second), interval(10.milliseconds)) {
+ assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location
=== bm2.blockManagerId)
+ }
+
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
+ === shuffleDataBlockContent)
+
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
+ === shuffleIndexBlockContent)
+ } else {
+ Thread.sleep(1000)
Review comment:
I think this can not ensure that the block fails due to the disk size
limitation. Block migration may doesn't finish within 1 second.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+ .doc("Maximum disk space to use to store shuffle blocks before rejecting
remote " +
+ "shuffle blocks. Rejecting remote shuffle blocks means that an exec
will not receive any " +
+ " shuffle migrations, and if there are no execs avaialble for
migration then " +
Review comment:
typo: "avaialble" -> "available"
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
Review comment:
I think we should put the conf under "decommission" namespace as it only
used for decommission.
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+ ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+ .doc("Maximum disk space to use to store shuffle blocks before rejecting
remote " +
+ "shuffle blocks. Rejecting remote shuffle blocks means that an exec
will not receive any " +
+ " shuffle migrations, and if there are no execs avaialble for
migration then " +
+ s"decommissioning will block unless
${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} " +
Review comment:
I actually don't understand why the sentence "if there are no execs
available ...." appended here. I think it's not necessarily needed here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]