tgravescs commented on a change in pull request #35085:
URL: https://github.com/apache/spark/pull/35085#discussion_r828086653
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
}
}.toSeq
- val removeRddBlockViaExtShuffleServiceFutures =
externalBlockStoreClient.map { shuffleClient =>
- blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
- Future[Int] {
- val numRemovedBlocks = shuffleClient.removeBlocks(
- bmId.host,
- bmId.port,
- bmId.executorId,
- blockIds.map(_.toString).toArray)
- numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ val removeRddBlockViaExtShuffleServiceFutures = if
(externalShuffleServiceRddFetchEnabled) {
+ externalBlockStoreClient.map { shuffleClient =>
+ blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+ Future[Int] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ }
}
- }
- }.getOrElse(Seq.empty)
+ }.getOrElse(Seq.empty)
+ } else {
+ Seq.empty
+ }
Future.sequence(removeRddFromExecutorsFutures ++
removeRddBlockViaExtShuffleServiceFutures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
+ // Find all shuffle blocks on executors that are no longer running
Review comment:
why do we care if executor has been deallocated? I could have a very
long running executor and it can have very old shuffle blocks for things that
are no longer needed. conversely I could have very new shuffle data on
executor that was just released.
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +95,13 @@ private[spark] class DiskBlockManager(
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists()) {
- Files.createDirectory(newDir.toPath)
+ // SPARK-37618: Create dir as group writable so files within can be
deleted by the
+ // shuffle service
+ val path = newDir.toPath
+ Files.createDirectory(path)
+ val currentPerms = Files.getPosixFilePermissions(path)
+ currentPerms.add(PosixFilePermission.GROUP_WRITE)
Review comment:
YARN has a deletion service that it uses to remove the files normally
(as the user), I started to look at this but I'm not sure the shuffle manager
can get a handle to it.
If we are going to take this approach, I want to only change permissions
when this feature is enabled. Can we limit it to just when creating the
shuffle files?
Ideally I would also like it documented.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]