Kimahriman commented on a change in pull request #35085:
URL: https://github.com/apache/spark/pull/35085#discussion_r829148170



##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +98,16 @@ private[spark] class DiskBlockManager(
       } else {
         val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
         if (!newDir.exists()) {
-          Files.createDirectory(newDir.toPath)
+          val path = newDir.toPath
+          Files.createDirectory(path)
+          if (shuffleServiceRemoveShuffleEnabled) {

Review comment:
       We can't do it here because we need to make sure all blockmgr dirs are 
created with group write permission, not matter what type of file causes the 
dir creation. We potentially can in `createTempFileWith`

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +98,16 @@ private[spark] class DiskBlockManager(
       } else {
         val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
         if (!newDir.exists()) {
-          Files.createDirectory(newDir.toPath)
+          val path = newDir.toPath
+          Files.createDirectory(path)
+          if (shuffleServiceRemoveShuffleEnabled) {

Review comment:
       We can't do it here because we need to make sure all blockmgr dirs are 
created with group write permission, no matter what type of file causes the dir 
creation. We potentially can in `createTempFileWith`

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +95,13 @@ private[spark] class DiskBlockManager(
       } else {
         val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
         if (!newDir.exists()) {
-          Files.createDirectory(newDir.toPath)
+          // SPARK-37618: Create dir as group writable so files within can be 
deleted by the
+          // shuffle service
+          val path = newDir.toPath
+          Files.createDirectory(path)
+          val currentPerms = Files.getPosixFilePermissions(path)
+          currentPerms.add(PosixFilePermission.GROUP_WRITE)

Review comment:
       Just kidding not actually true, need to trace through and figure out but 
some final shuffle blocks must be created with `createTempShuffleBlock`

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    val removeRddBlockViaExtShuffleServiceFutures = 
externalBlockStoreClient.map { shuffleClient =>
-      blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
-        Future[Int] {
-          val numRemovedBlocks = shuffleClient.removeBlocks(
-            bmId.host,
-            bmId.port,
-            bmId.executorId,
-            blockIds.map(_.toString).toArray)
-          numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+    val removeRddBlockViaExtShuffleServiceFutures = if 
(externalShuffleServiceRddFetchEnabled) {
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Int] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+          }
         }
-      }
-    }.getOrElse(Seq.empty)
+      }.getOrElse(Seq.empty)
+    } else {
+      Seq.empty
+    }
 
     Future.sequence(removeRddFromExecutorsFutures ++ 
removeRddBlockViaExtShuffleServiceFutures)
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
-    // Nothing to do in the BlockManagerMasterEndpoint data structures
     val removeMsg = RemoveShuffle(shuffleId)
-    Future.sequence(
-      blockManagerInfo.values.map { bm =>
-        bm.storageEndpoint.ask[Boolean](removeMsg).recover {
-          // use false as default value means no shuffle data were removed
-          handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+    val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+      bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+        // use false as default value means no shuffle data were removed
+        handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+      }
+    }.toSeq
+
+    // Find all shuffle blocks on executors that are no longer running
+    val blocksToDeleteByShuffleService =
+      new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+    if (externalShuffleServiceRemoveShuffleEnabled) {
+      mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus 
=>
+        shuffleStatus.mapStatuses.foreach { mapStatus =>
+          // Port should always be external shuffle port if external shuffle 
is enabled so
+          // also check if the executor has been deallocated
+          if (mapStatus.location.port == externalShuffleServicePort &&

Review comment:
       I think this was carried over from before this whole thing was wrapped 
in a check for if shuffle service is enabled, I think I can remove

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
##########
@@ -41,6 +41,14 @@ trait ShuffleBlockResolver {
    */
   def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None): 
ManagedBuffer
 
+  /**
+   * Retrive a list of BlockIds for a given shuffle map. Used to delete 
shuffle files

Review comment:
       I was gonna say i before e but I just forgot the e!

##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
     try {
       if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
         logDebug("Cleaning shuffle " + shuffleId)
-        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+        // Shuffle must be removed before it's unregistered from the output 
tracker
+        // to find blocks served by the shuffle service on deallocated 
executors
         shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+        mapOutputTrackerMaster.unregisterShuffle(shuffleId)

Review comment:
       It hasn't failed any tests or caused problems in heavy usage for the 
past couple months. Not sure how else to know

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {

Review comment:
       In that case the external shuffle service will just log unable to delete 
the file and continue. Is there anywhere to actually know that information?

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -654,6 +654,16 @@ package object config {
       .booleanConf
       .createWithDefault(false)
 
+  private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
+    ConfigBuilder("spark.shuffle.service.removeShuffle")
+      .doc("Whether to use the ExternalShuffleService for deleting shuffle 
blocks for " +
+        "deallocated executors when the shuffle is no longer needed. Without 
this enabled, " +
+        "shuffle data on executors that are deallocated will remain on disk 
until the " +
+        "application ends.")
+      .version("3.3.0")

Review comment:
       We had to disable the external shuffle service for basically all of our 
large shuffling jobs until I got this working, and based on others interest and 
all the jira tickets that have been made over the years I'd consider this a 
critically missing feature. I'm fine disabling by default, especially if we try 
to limit the permission changes anymore, the more likely this will cause issues 
using the external shuffle service

##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
     try {
       if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
         logDebug("Cleaning shuffle " + shuffleId)
-        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+        // Shuffle must be removed before it's unregistered from the output 
tracker
+        // to find blocks served by the shuffle service on deallocated 
executors
         shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+        mapOutputTrackerMaster.unregisterShuffle(shuffleId)

Review comment:
       3.2 with my own PRs backported (most of which are already merged and 
will be in 3.3).
   
   I do know I need this change for this PR to work and I know why. I don't 
know how to create a test case for a scenario I don't even know about. If you 
can think of one knowing this code paths better please let me know.

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    val removeRddBlockViaExtShuffleServiceFutures = 
externalBlockStoreClient.map { shuffleClient =>
-      blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
-        Future[Int] {
-          val numRemovedBlocks = shuffleClient.removeBlocks(
-            bmId.host,
-            bmId.port,
-            bmId.executorId,
-            blockIds.map(_.toString).toArray)
-          numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+    val removeRddBlockViaExtShuffleServiceFutures = if 
(externalShuffleServiceRddFetchEnabled) {
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Int] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+          }
         }
-      }
-    }.getOrElse(Seq.empty)
+      }.getOrElse(Seq.empty)
+    } else {
+      Seq.empty
+    }
 
     Future.sequence(removeRddFromExecutorsFutures ++ 
removeRddBlockViaExtShuffleServiceFutures)
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
-    // Nothing to do in the BlockManagerMasterEndpoint data structures
     val removeMsg = RemoveShuffle(shuffleId)
-    Future.sequence(
-      blockManagerInfo.values.map { bm =>
-        bm.storageEndpoint.ask[Boolean](removeMsg).recover {
-          // use false as default value means no shuffle data were removed
-          handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+    val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+      bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+        // use false as default value means no shuffle data were removed
+        handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+      }
+    }.toSeq
+
+    // Find all shuffle blocks on executors that are no longer running
+    val blocksToDeleteByShuffleService =
+      new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+    if (externalShuffleServiceRemoveShuffleEnabled) {
+      mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus 
=>
+        shuffleStatus.mapStatuses.foreach { mapStatus =>
+          // Port should always be external shuffle port if external shuffle 
is enabled so
+          // also check if the executor has been deallocated
+          if (mapStatus.location.port == externalShuffleServicePort &&

Review comment:
       Removed this check

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {

Review comment:
       I believe that's covered here: 
https://github.com/apache/spark/blob/master/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java#L270
   
   Also curious what the case is for no data file. Is that just when a shuffle 
map has no output rows?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -141,6 +141,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     assert(attemptId.equals("1"))
   }
 
+  test("SPARK-37618: Sub dirs are group writable") {
+    val conf = testConf.clone
+    conf.set("spark.local.dir", rootDirs)
+    conf.set("spark.shuffle.service.enabled", "true")
+    conf.set("spark.shuffle.service.removeShufle", "true")

Review comment:
       Added a negative test case too

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    val removeRddBlockViaExtShuffleServiceFutures = 
externalBlockStoreClient.map { shuffleClient =>
-      blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
-        Future[Int] {
-          val numRemovedBlocks = shuffleClient.removeBlocks(
-            bmId.host,
-            bmId.port,
-            bmId.executorId,
-            blockIds.map(_.toString).toArray)
-          numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+    val removeRddBlockViaExtShuffleServiceFutures = if 
(externalShuffleServiceRddFetchEnabled) {
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Int] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+          }
         }
-      }
-    }.getOrElse(Seq.empty)
+      }.getOrElse(Seq.empty)
+    } else {
+      Seq.empty
+    }
 
     Future.sequence(removeRddFromExecutorsFutures ++ 
removeRddBlockViaExtShuffleServiceFutures)
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
-    // Nothing to do in the BlockManagerMasterEndpoint data structures
     val removeMsg = RemoveShuffle(shuffleId)
-    Future.sequence(
-      blockManagerInfo.values.map { bm =>
-        bm.storageEndpoint.ask[Boolean](removeMsg).recover {
-          // use false as default value means no shuffle data were removed
-          handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+    val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+      bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+        // use false as default value means no shuffle data were removed
+        handleBlockRemovalFailure("shuffle", shuffleId.toString, 
bm.blockManagerId, false)
+      }
+    }.toSeq
+
+    // Find all shuffle blocks on executors that are no longer running

Review comment:
       We could, but there's already a lot going on in this PR I'd prefer not 
to change (or accidentally break) the current shuffle deletion. And I'm not 
sure if all block managers still track shuffles for some reason even if they 
don't have any of the shuffle files?

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] 
= {

Review comment:
       I'll check if the original RDD cache serving and deletion had a test 
case for that

##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
     try {
       if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
         logDebug("Cleaning shuffle " + shuffleId)
-        mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+        // Shuffle must be removed before it's unregistered from the output 
tracker
+        // to find blocks served by the shuffle service on deallocated 
executors
         shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+        mapOutputTrackerMaster.unregisterShuffle(shuffleId)

Review comment:
       Looking at `unregisterShuffle` I don't see anything that would depend on 
the ShuffleDriverComponents




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to