holdenk commented on a change in pull request #31493:
URL: https://github.com/apache/spark/pull/31493#discussion_r571239783



##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -173,6 +183,13 @@ private[spark] class IndexShuffleBlockResolver(
    */
   override def putShuffleBlockAsStream(blockId: BlockId, serializerManager: 
SerializerManager):
       StreamCallbackWithID = {
+    // Throw an exception if we have exceeded maximum shuffle files stored
+    remoteShuffleMaxDisk.foreach { maxBytes =>
+      val bytesUsed = getShuffleBytesStored()

Review comment:
       That's a good question. So looking at benchmarks folks have done for 
file size takes a few microseconds per file (and speeds up on subsequent runs 
due to cache). It doesn't need to actually read the file to determine the size 
(if it did that would be much slower).

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -420,6 +420,14 @@ package object config {
       .intConf
       .createWithDefault(1)
 
+  private[spark] val STORAGE_REMOTE_SHUFFLE_MAX_DISK =
+    ConfigBuilder("spark.storage.remote.shuffle.maxDisk")
+      .doc("Maximum disk space to use to store shuffle blocks before rejecting 
remote " +

Review comment:
       Sure, good idea :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to