Kimahriman commented on a change in pull request #35085:
URL: https://github.com/apache/spark/pull/35085#discussion_r829656786
##########
File path: core/src/main/scala/org/apache/spark/ContextCleaner.scala
##########
@@ -235,8 +235,10 @@ private[spark] class ContextCleaner(
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
- mapOutputTrackerMaster.unregisterShuffle(shuffleId)
+ // Shuffle must be removed before it's unregistered from the output
tracker
+ // to find blocks served by the shuffle service on deallocated
executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)
+ mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Review comment:
3.2 with my own PRs backported (most of which are already merged and
will be in 3.3).
I do know I need this change for this PR to work and I know why. I don't
know how to create a test case for a scenario I don't even know about. If you
can think of one knowing this code paths better please let me know.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
}
}.toSeq
- val removeRddBlockViaExtShuffleServiceFutures =
externalBlockStoreClient.map { shuffleClient =>
- blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
- Future[Int] {
- val numRemovedBlocks = shuffleClient.removeBlocks(
- bmId.host,
- bmId.port,
- bmId.executorId,
- blockIds.map(_.toString).toArray)
- numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ val removeRddBlockViaExtShuffleServiceFutures = if
(externalShuffleServiceRddFetchEnabled) {
+ externalBlockStoreClient.map { shuffleClient =>
+ blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+ Future[Int] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ }
}
- }
- }.getOrElse(Seq.empty)
+ }.getOrElse(Seq.empty)
+ } else {
+ Seq.empty
+ }
Future.sequence(removeRddFromExecutorsFutures ++
removeRddBlockViaExtShuffleServiceFutures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
- Future.sequence(
- blockManagerInfo.values.map { bm =>
- bm.storageEndpoint.ask[Boolean](removeMsg).recover {
- // use false as default value means no shuffle data were removed
- handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+ bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+ // use false as default value means no shuffle data were removed
+ handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ }
+ }.toSeq
+
+ // Find all shuffle blocks on executors that are no longer running
+ val blocksToDeleteByShuffleService =
+ new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+ if (externalShuffleServiceRemoveShuffleEnabled) {
+ mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus
=>
+ shuffleStatus.mapStatuses.foreach { mapStatus =>
+ // Port should always be external shuffle port if external shuffle
is enabled so
+ // also check if the executor has been deallocated
+ if (mapStatus.location.port == externalShuffleServicePort &&
Review comment:
Removed this check
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId]
= {
Review comment:
I believe that's covered here:
https://github.com/apache/spark/blob/master/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java#L270
Also curious what the case is for no data file. Is that just when a shuffle
map has no output rows?
##########
File path:
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -141,6 +141,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with
BeforeAndAfterEach with B
assert(attemptId.equals("1"))
}
+ test("SPARK-37618: Sub dirs are group writable") {
+ val conf = testConf.clone
+ conf.set("spark.local.dir", rootDirs)
+ conf.set("spark.shuffle.service.enabled", "true")
+ conf.set("spark.shuffle.service.removeShufle", "true")
Review comment:
Added a negative test case too
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -294,33 +298,74 @@ class BlockManagerMasterEndpoint(
}
}.toSeq
- val removeRddBlockViaExtShuffleServiceFutures =
externalBlockStoreClient.map { shuffleClient =>
- blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
- Future[Int] {
- val numRemovedBlocks = shuffleClient.removeBlocks(
- bmId.host,
- bmId.port,
- bmId.executorId,
- blockIds.map(_.toString).toArray)
- numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ val removeRddBlockViaExtShuffleServiceFutures = if
(externalShuffleServiceRddFetchEnabled) {
+ externalBlockStoreClient.map { shuffleClient =>
+ blocksToDeleteByShuffleService.map { case (bmId, blockIds) =>
+ Future[Int] {
+ val numRemovedBlocks = shuffleClient.removeBlocks(
+ bmId.host,
+ bmId.port,
+ bmId.executorId,
+ blockIds.map(_.toString).toArray)
+ numRemovedBlocks.get(defaultRpcTimeout.duration.toSeconds,
TimeUnit.SECONDS)
+ }
}
- }
- }.getOrElse(Seq.empty)
+ }.getOrElse(Seq.empty)
+ } else {
+ Seq.empty
+ }
Future.sequence(removeRddFromExecutorsFutures ++
removeRddBlockViaExtShuffleServiceFutures)
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
val removeMsg = RemoveShuffle(shuffleId)
- Future.sequence(
- blockManagerInfo.values.map { bm =>
- bm.storageEndpoint.ask[Boolean](removeMsg).recover {
- // use false as default value means no shuffle data were removed
- handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
+ bm.storageEndpoint.ask[Boolean](removeMsg).recover {
+ // use false as default value means no shuffle data were removed
+ handleBlockRemovalFailure("shuffle", shuffleId.toString,
bm.blockManagerId, false)
+ }
+ }.toSeq
+
+ // Find all shuffle blocks on executors that are no longer running
Review comment:
We could, but there's already a lot going on in this PR I'd prefer not
to change (or accidentally break) the current shuffle deletion. And I'm not
sure if all block managers still track shuffles for some reason even if they
don't have any of the shuffle files?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -597,6 +602,13 @@ private[spark] class IndexShuffleBlockResolver(
}
}
+ override def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId]
= {
Review comment:
I'll check if the original RDD cache serving and deletion had a test
case for that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]