mridulm commented on a change in pull request #35085:
URL: https://github.com/apache/spark/pull/35085#discussion_r795000809
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -52,6 +53,7 @@ class BlockManagerMasterEndpoint(
externalBlockStoreClient: Option[ExternalBlockStoreClient],
Review comment:
Earlier, `externalBlockStoreClient` would be nonEmpty only when
`SHUFFLE_SERVICE_FETCH_RDD_ENABLED` was enabled - but now it is nonEmpty in
case shuffle service is enabled.
This results in `removeRdd` enabling `externalBlockStoreClient` related
codepath.
For now, let us guard this with a check against
`externalShuffleServiceRddFetchEnabled` and revisit enabling them in a
different PR ?
Thoughts ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -311,16 +313,52 @@ class BlockManagerMasterEndpoint(
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
+ // Find all shuffle blocks on executors that are no longer running
+ val blocksToDeleteByShuffleService =
+ new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+ if (externalBlockStoreClient.isDefined) {
+ mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus
=>
+ shuffleStatus.mapStatuses.foreach { mapStatus =>
+ // Port should always be external shuffle port if external shuffle
is enabled
+ val isShufflePort = mapStatus.location.port ==
externalShuffleServicePort
+ val executorDeallocated =
+ !blockManagerIdByExecutor.contains(mapStatus.location.executorId)
+ if (isShufflePort && executorDeallocated) {
+ val blocks =
blocksToDeleteByShuffleService.getOrElseUpdate(mapStatus.location,
+ new mutable.HashSet[BlockId])
Review comment:
get/update only if `blocksToDel` is non-empty (if we have possibility of
other `ShuffleBlockResolver`)
##########
File path:
core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
##########
@@ -87,7 +86,7 @@ public ShufflePartitionWriter getPartitionWriter(int
reducePartitionId) throws I
}
lastPartitionId = reducePartitionId;
if (outputTempFile == null) {
- outputTempFile = Utils.tempFileWith(outputFile);
+ outputTempFile = blockResolver.createTempFile(outputFile);
Review comment:
I might be missing some context here - why do we need to do this ?
In order to delete from shuffle service, we only need to ensure the
directory hosting the block has read/write/execute acl for shuffle service,
right ?
##########
File path:
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
##########
@@ -41,6 +41,14 @@ trait ShuffleBlockResolver {
*/
def getBlockData(blockId: BlockId, dirs: Option[Array[String]] = None):
ManagedBuffer
+ /**
+ * Retrive a list of BlockIds for a given shuffle map. Used to delete
shuffle files
+ * from the external shuffle service after the associated executor has been
removed.
+ */
+ def getBlocksForShuffle(shuffleId: Int, mapId: Long): Seq[BlockId] = {
+ Seq.empty
+ }
Review comment:
I agree with your assessment @Kimahriman and would prefer to avoid this
methough, though @Ngone51 had expressed concern in the past that some of the
private classes might be getting used for some custom implementations.
Thoughts @Ngone51 ?
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -166,13 +173,41 @@ private[spark] class DiskBlockManager(
}
}
+ /**
+ * SPARK-37618: Makes sure that the file is created as world readable. This
is to get
+ * around the fact that making the block manager sub dirs group writable
removes
+ * the setgid bit in secure Yarn environments, which prevents the shuffle
service
+ * from being able ot read shuffle files. The outer directories will still
not be
+ * world executable, so this doesn't allow access to these files except for
the
+ * running user and shuffle service.
+ */
+ def createWorldReadableFile(file: File): Unit = {
Review comment:
See comment on getFile above, with that, we dont need
`createWorldReadableFile` or `createTempFileWith` (below) ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -311,16 +313,52 @@ class BlockManagerMasterEndpoint(
}
private def removeShuffle(shuffleId: Int): Future[Seq[Boolean]] = {
- // Nothing to do in the BlockManagerMasterEndpoint data structures
+ // Find all shuffle blocks on executors that are no longer running
+ val blocksToDeleteByShuffleService =
+ new mutable.HashMap[BlockManagerId, mutable.HashSet[BlockId]]
+ if (externalBlockStoreClient.isDefined) {
+ mapOutputTracker.shuffleStatuses.get(shuffleId).foreach { shuffleStatus
=>
+ shuffleStatus.mapStatuses.foreach { mapStatus =>
+ // Port should always be external shuffle port if external shuffle
is enabled
+ val isShufflePort = mapStatus.location.port ==
externalShuffleServicePort
+ val executorDeallocated =
+ !blockManagerIdByExecutor.contains(mapStatus.location.executorId)
Review comment:
inline the variables in `if` check below to avoid
`blockManagerIdByExecutor` check
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -94,7 +95,13 @@ private[spark] class DiskBlockManager(
} else {
val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
if (!newDir.exists()) {
- Files.createDirectory(newDir.toPath)
+ // SPARK-37618: Create dir as group writable so files within can be
deleted by the
+ // shuffle service
+ val path = newDir.toPath
+ Files.createDirectory(path)
+ val currentPerms = Files.getPosixFilePermissions(path)
+ currentPerms.add(PosixFilePermission.GROUP_WRITE)
Review comment:
Add `GROUP_READ` and `GROUP_EXECUTE` (for listing) as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]