tgravescs commented on a change in pull request #35085:
URL: https://github.com/apache/spark/pull/35085#discussion_r828086653



##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
       }
     }.toSeq
 
-    val removeRddBlockViaExtShuffleServiceFutures = 
externalBlockStoreClient.map { shuffleClient =>
-      blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
-        Future[Int] {
-          val numRemovedBlocks = shuffleClient.removeBlocks(
-            bmId.host,
-            bmId.port,
-            bmId.executorId,
-            blockIds.map(_.toString).toArray)
-          numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+    val removeRddBlockViaExtShuffleServiceFutures = if 
(externalShuffleServiceRddFetchEnabled) {
+      externalBlockStoreClient.map { shuffleClient =>
+        blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+          Future[Int] {
+            val numRemovedBlocks = shuffleClient.removeBlocks(
+              bmId.host,
+              bmId.port,
+              bmId.executorId,
+              blockIds.map(_.toString).toArray)
+            numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds, 
TimeUnit.SECONDS)
+          }
         }
-      }
-    }.getOrElse(Seq.empty)
+      }.getOrElse(Seq.empty)
+    } else {
+      Seq.empty
+    }
 
     Future.sequence(removeRddFromExecutorsFutures ++ 
removeRddBlockViaExtShuffleServiceFutures)
   }
 
   private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
-    // Nothing to do in the BlockManagerMasterEndpoint data structures
+    // Find all shuffle blocks on executors that are no longer running

Review comment:
       why do we care if executor has been deallocated?  I could have a very 
long running executor and it can have very old shuffle blocks for things that 
are no longer needed.   conversely I could have very new shuffle data on 
executor that was just released.

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +95,13 @@ private[spark] class DiskBlockManager(
       } else {
         val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
         if (!newDir.exists()) {
-          Files.createDirectory(newDir.toPath)
+          // SPARK-37618: Create dir as group writable so files within can be 
deleted by the
+          // shuffle service
+          val path = newDir.toPath
+          Files.createDirectory(path)
+          val currentPerms = Files.getPosixFilePermissions(path)
+          currentPerms.add(PosixFilePermission.GROUP_WRITE)

Review comment:
       YARN has a deletion service that it uses to remove the files normally 
(as the user), I started to look at this but I'm not sure the shuffle manager 
can get a handle to it.
   
   If we are going to take this approach, I want to only change permissions 
when this feature is enabled.  Can we limit it to just when creating the 
shuffle files?
   Ideally I would also like it documented. 
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to