Ngone51 commented on a change in pull request #31493:
URL: https://github.com/apache/spark/pull/31493#discussion_r571436070



##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +
+        "shuffle blocks. Rejecting remote shuffle blocks means that an exec 
will not receive any " +

Review comment:
       "an exec" -> "an executor"?

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -72,6 +73,15 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  private def getShuffleBytesStored(): Long = {
+    val shuffleFiles: Seq[File] = getStoredShuffles().map {
+      si => getDataFile(si.shuffleId, si.mapId)
+    }
+    shuffleFiles.foldLeft[Long](0: Long) { (acc: Long, f: File) =>
+      acc + f.length()
+    }

Review comment:
       ```suggestion
       shuffleFiles.map(_.length()).sum
   ```

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +
+        "shuffle blocks. Rejecting remote shuffle blocks means that an exec 
will not receive any " +
+        " shuffle migrations, and if there are no execs avaialble for 
migration then " +

Review comment:
       "no execs" -> "no executors"?

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1932,22 +1932,29 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
   }
 
-  test("test migration of shuffle blocks during decommissioning") {
-    val shuffleManager1 = makeSortShuffleManager()
+  private def testShuffleBlockDecommissioning(maxShuffle: Option[Int], 
migrate: Boolean) = {
+    maxShuffle.foreach{ b =>
+      conf.set(STORAGE_REMOTE_SHUFFLE_MAX_DISK.key, s"${b}b")

Review comment:
       Probably rename `maxShuffle` to `maxShuffleDiskSize` and rename `b` to 
`diskSize`

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +
+        "shuffle blocks. Rejecting remote shuffle blocks means that an exec 
will not receive any " +
+        " shuffle migrations, and if there are no execs avaialble for 
migration then " +
+        s"decommissioning will block unless 
${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} " +

Review comment:
       Will block? I think decommission will stop soon rather than block in 
that case.

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1961,20 +1968,37 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
       decomManager.refreshOffloadingShuffleBlocks()
 
-      eventually(timeout(1.second), interval(10.milliseconds)) {
-        assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === 
bm2.blockManagerId)
+      if (migrate) {
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location 
=== bm2.blockManagerId)
+        }
+        
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
+          === shuffleDataBlockContent)
+        
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
+          === shuffleIndexBlockContent)
+      } else {
+        Thread.sleep(1000)
+        assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === 
bm1.blockManagerId)
       }
-      
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
-        === shuffleDataBlockContent)
-      
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
-        === shuffleIndexBlockContent)
     } finally {
       mapOutputTracker.unregisterShuffle(0)
       // Avoid thread leak
       decomManager.stopOffloadingShuffleBlocks()
     }
   }
 
+  test("test migration of shuffle blocks during decommissioning - no limit") {
+    testShuffleBlockDecommissioning(None, true)
+  }
+
+  test("test migration of shuffle blocks during decommissioning - larger 
limit") {
+    testShuffleBlockDecommissioning(Some(10000), true)
+  }
+
+  test("test migration of shuffle blocks during decommissioning - small 
limit") {

Review comment:
       attach the JIRA ID?

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1932,22 +1932,29 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     assert(master.getLocations(blockIdLarge) === Seq(store1.blockManagerId))
   }
 
-  test("test migration of shuffle blocks during decommissioning") {
-    val shuffleManager1 = makeSortShuffleManager()
+  private def testShuffleBlockDecommissioning(maxShuffle: Option[Int], 
migrate: Boolean) = {

Review comment:
       Maybe rename "migrate" to "willReject"?
   
   All cases actually will try to migrate the blocks first. The difference is 
whether it'll be rejected or not.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +
+        "shuffle blocks. Rejecting remote shuffle blocks means that an exec 
will not receive any " +
+        " shuffle migrations, and if there are no execs avaialble for 
migration then " +

Review comment:
       redundant whitespace at the prefix

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")

Review comment:
       "*. maxDisk" -> "*.maxDiskSize"?

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -173,6 +183,13 @@ private[spark] class IndexShuffleBlockResolver(
    */
   override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: 
SerializerManager):
       StreamCallbackWithID = {
+    // Throw an exception if we have exceeded maximum shuffle files stored
+    remoteShuffleMaxDisk.foreach { maxBytes =>
+      val bytesUsed = getShuffleBytesStored()
+      if (maxBytes < bytesUsed) {
+        throw new SparkException(s"Not storing remote shuffles $bytesUsed 
exceeds $maxBytes")

Review comment:
       Give a hint of the conf for users to increase the maxBytes?

##########
File path: core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
##########
@@ -1961,20 +1968,37 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
 
       decomManager.refreshOffloadingShuffleBlocks()
 
-      eventually(timeout(1.second), interval(10.milliseconds)) {
-        assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === 
bm2.blockManagerId)
+      if (migrate) {
+        eventually(timeout(1.second), interval(10.milliseconds)) {
+          assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location 
=== bm2.blockManagerId)
+        }
+        
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleData).toPath())
+          === shuffleDataBlockContent)
+        
assert(Files.readAllBytes(bm2.diskBlockManager.getFile(shuffleIndex).toPath())
+          === shuffleIndexBlockContent)
+      } else {
+        Thread.sleep(1000)

Review comment:
       I think this can not ensure that the block fails due to the disk size 
limitation. Block migration may doesn't finish within 1 second.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +
+        "shuffle blocks. Rejecting remote shuffle blocks means that an exec 
will not receive any " +
+        " shuffle migrations, and if there are no execs avaialble for 
migration then " +

Review comment:
       typo: "avaialble" -> "available"

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")

Review comment:
       I think we should put the conf under "decommission" namespace as it only 
used for decommission.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -488,6 +488,17 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +
+        "shuffle blocks. Rejecting remote shuffle blocks means that an exec 
will not receive any " +
+        " shuffle migrations, and if there are no execs avaialble for 
migration then " +
+        s"decommissioning will block unless 
${STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH.key} " +

Review comment:
       I actually don't understand why the sentence "if there are no execs 
available ...." appended here. I think it's not necessarily needed here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to