Kimahriman commented on a change in pull request #35085:
URL: https://github.com/apache/spark/pull/35085#discussion_r829148170
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +98,16 @@ private[spark] class DiskBlockManager(
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists()) {
- Files.createDirectory(newDir.toPath)
+ val path = newDir.toPath
+ Files.createDirectory(path)
+ if (shuffleServiceRemoveShuffleEnabled) {
Review comment:
We can't do it here because we need to make sure all blockmgr dirs are
created with group write permission, not matter what type of file causes the
dir creation. We potentially can in `createTempFileWith`
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +98,16 @@ private[spark] class DiskBlockManager(
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists()) {
- Files.createDirectory(newDir.toPath)
+ val path = newDir.toPath
+ Files.createDirectory(path)
+ if (shuffleServiceRemoveShuffleEnabled) {
Review comment:
We can't do it here because we need to make sure all blockmgr dirs are
created with group write permission, no matter what type of file causes the dir
creation. We potentially can in `createTempFileWith`
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +95,13 @@ private[spark] class DiskBlockManager(
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists()) {
- Files.createDirectory(newDir.toPath)
+ // SPARK-37618: Create dir as group writable so files within can be
deleted by the
+ // shuffle service
+ val path = newDir.toPath
+ Files.createDirectory(path)
+ val currentPerms = Files.getPosixFilePermissions(path)
+ currentPerms.add(PosixFilePermission.GROUP_WRITE)
Review comment:
Just kidding not actually true, need to trace through and figure out but
some final shuffle blocks must be created with `createTempShuffleBlock`
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
}
}.toSeq
- val removeRddBlockViaExtShuffleServiceFutures =
externalBlockStoreClient.map { shuffleClient =>
- blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
- Future[Int] {
- val numRemovedBlocks = shuffleClient.removeBlocks(
- bmId.host,
- bmId.port,
- bmId.executorId,
- blockIds.map(_.toString).toArray)
- numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ val removeRddBlockViaExtShuffleServiceFutures = if
(externalShuffleServiceRddFetchEnabled) {
+ externalBlockStoreClient.map { shuffleClient =>
+ blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+ Future[Int] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ }
}
- }
- }.getOrElse(Seq.empty)
+ }.getOrElse(Seq.empty)
+ } else {
+ Seq.empty
+ }
Future.sequence(removeRddFromExecutorsFutures ++
removeRddBlockViaExtShuffleServiceFutures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
- Future.sequence(
- blockManagerInfo.values.map { bm =>
- bm.storageEndpoint.ask[Boolean](removeMsg).recover {
- // use false as default value means no shuffle data were removed
- handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+ bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+ // use false as default value means no shuffle data were removed
+ handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ }
+ }.toSeq
+
+ // Find all shuffle blocks on executors that are no longer running
+ val blocksToDeleteByShuffleService =
+ new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+ if (externalShuffleServiceRemoveShuffleEnabled) {
+ mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus
=>
+ shuffleStatus.mapStatuses.foreach { mapStatus =>
+ // Port should always be external shuffle port if external shuffle
is enabled so
+ // also check if the executor has been deallocated
+ if (mapStatus.location.port == externalShuffleServicePort &&
Review comment:
I think this was carried over from before this whole thing was wrapped
in a check for if shuffle service is enabled, I think I can remove
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
##########
@@ -41,6 +41,14 @@ trait ShuffleBlockResolver {
*/
def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None):
ManagedBuffer
+ /**
+ * Retrive a list of BlockIds for a given shuffle map. Used to delete
shuffle files
Review comment:
I was gonna say i before e but I just forgot the e!
##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
- mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+ // Shuffle must be removed before it's unregistered from the output
tracker
+ // to find blocks served by the shuffle service on deallocated
executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+ mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Review comment:
It hasn't failed any tests or caused problems in heavy usage for the
past couple months. Not sure how else to know
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId]
= {
Review comment:
In that case the external shuffle service will just log unable to delete
the file and continue. Is there anywhere to actually know that information?
##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -654,6 +654,16 @@ package object config {
.booleanConf
.createWithDefault(false)
+ private[spark] val SHUFFLE_SERVICE_REMOVE_SHUFFLE_ENABLED =
+ ConfigBuilder("spark.shuffle.service.removeShuffle")
+ .doc("Whether to use the ExternalShuffleService for deleting shuffle
blocks for " +
+ "deallocated executors when the shuffle is no longer needed. Without
this enabled, " +
+ "shuffle data on executors that are deallocated will remain on disk
until the " +
+ "application ends.")
+ .version("3.3.0")
Review comment:
We had to disable the external shuffle service for basically all of our
large shuffling jobs until I got this working, and based on others interest and
all the jira tickets that have been made over the years I'd consider this a
critically missing feature. I'm fine disabling by default, especially if we try
to limit the permission changes anymore, the more likely this will cause issues
using the external shuffle service
##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
- mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+ // Shuffle must be removed before it's unregistered from the output
tracker
+ // to find blocks served by the shuffle service on deallocated
executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+ mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Review comment:
3.2 with my own PRs backported (most of which are already merged and
will be in 3.3).
I do know I need this change for this PR to work and I know why. I don't
know how to create a test case for a scenario I don't even know about. If you
can think of one knowing this code paths better please let me know.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
}
}.toSeq
- val removeRddBlockViaExtShuffleServiceFutures =
externalBlockStoreClient.map { shuffleClient =>
- blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
- Future[Int] {
- val numRemovedBlocks = shuffleClient.removeBlocks(
- bmId.host,
- bmId.port,
- bmId.executorId,
- blockIds.map(_.toString).toArray)
- numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ val removeRddBlockViaExtShuffleServiceFutures = if
(externalShuffleServiceRddFetchEnabled) {
+ externalBlockStoreClient.map { shuffleClient =>
+ blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+ Future[Int] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ }
}
- }
- }.getOrElse(Seq.empty)
+ }.getOrElse(Seq.empty)
+ } else {
+ Seq.empty
+ }
Future.sequence(removeRddFromExecutorsFutures ++
removeRddBlockViaExtShuffleServiceFutures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
- Future.sequence(
- blockManagerInfo.values.map { bm =>
- bm.storageEndpoint.ask[Boolean](removeMsg).recover {
- // use false as default value means no shuffle data were removed
- handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+ bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+ // use false as default value means no shuffle data were removed
+ handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ }
+ }.toSeq
+
+ // Find all shuffle blocks on executors that are no longer running
+ val blocksToDeleteByShuffleService =
+ new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+ if (externalShuffleServiceRemoveShuffleEnabled) {
+ mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus
=>
+ shuffleStatus.mapStatuses.foreach { mapStatus =>
+ // Port should always be external shuffle port if external shuffle
is enabled so
+ // also check if the executor has been deallocated
+ if (mapStatus.location.port == externalShuffleServicePort &&
Review comment:
Removed this check
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId]
= {
Review comment:
I believe that's covered here:
https://github.com/apache/spark/blob/master/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java#L270
Also curious what the case is for no data file. Is that just when a shuffle
map has no output rows?
##########
File path:
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -141,6 +141,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with
BeforeAndAfterEach with B
assert(attemptId.equals("1"))
}
+ test("SPARK-37618: Sub dirs are group writable") {
+ val conf = testConf.clone
+ conf.set("spark.local.dir", rootDirs)
+ conf.set("spark.shuffle.service.enabled", "true")
+ conf.set("spark.shuffle.service.removeShufle", "true")
Review comment:
Added a negative test case too
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
}
}.toSeq
- val removeRddBlockViaExtShuffleServiceFutures =
externalBlockStoreClient.map { shuffleClient =>
- blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
- Future[Int] {
- val numRemovedBlocks = shuffleClient.removeBlocks(
- bmId.host,
- bmId.port,
- bmId.executorId,
- blockIds.map(_.toString).toArray)
- numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ val removeRddBlockViaExtShuffleServiceFutures = if
(externalShuffleServiceRddFetchEnabled) {
+ externalBlockStoreClient.map { shuffleClient =>
+ blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+ Future[Int] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ }
}
- }
- }.getOrElse(Seq.empty)
+ }.getOrElse(Seq.empty)
+ } else {
+ Seq.empty
+ }
Future.sequence(removeRddFromExecutorsFutures ++
removeRddBlockViaExtShuffleServiceFutures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
- Future.sequence(
- blockManagerInfo.values.map { bm =>
- bm.storageEndpoint.ask[Boolean](removeMsg).recover {
- // use false as default value means no shuffle data were removed
- handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+ bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+ // use false as default value means no shuffle data were removed
+ handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ }
+ }.toSeq
+
+ // Find all shuffle blocks on executors that are no longer running
Review comment:
We could, but there's already a lot going on in this PR I'd prefer not
to change (or accidentally break) the current shuffle deletion. And I'm not
sure if all block managers still track shuffles for some reason even if they
don't have any of the shuffle files?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId]
= {
Review comment:
I'll check if the original RDD cache serving and deletion had a test
case for that
##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
- mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+ // Shuffle must be removed before it's unregistered from the output
tracker
+ // to find blocks served by the shuffle service on deallocated
executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+ mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Review comment:
Looking at `unregisterShuffle` I don't see anything that would depend on
the ShuffleDriverComponents
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]