zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r648874848
##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,82 @@ private[spark] class DiskBlockManager(conf: SparkConf,
deleteFilesOnStop: Boolea
}
}
+ /**
+ * Get the list of configured local dirs storing merged shuffle blocks
created by executors
+ * if push based shuffle is enabled. Note that the files in this directory
will be created
+ * by the external shuffle services. We only create the merge_manager
directories and
+ * subdirectories here because currently the external shuffle service
doesn't have
+ * permission to create directories under application local directories.
+ */
+ private def createLocalDirsForMergedShuffleBlocks(): Unit = {
+ if (Utils.isPushBasedShuffleEnabled(conf)) {
+ // Will create the merge_manager directory only if it doesn't exist
under the local dir.
+ Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
+ try {
+ val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+ if (!mergeDir.exists()) {
+ // This executor does not find merge_manager directory, it will
try to create
+ // the merge_manager directory and the sub directories.
+ logDebug(s"Try to create $mergeDir and its sub dirs since the " +
+ s"$MERGE_MANAGER_DIR dir does not exist")
+ for (dirNum <- 0 until subDirsPerLocalDir) {
+ val subDir = new File(mergeDir, "%02x".format(dirNum))
+ if (!subDir.exists()) {
+ // Only one container will create this directory. The
filesystem will handle
+ // any race conditions.
+ createDirWithCustomizedPermission(subDir, "770")
+ }
+ }
+ }
+ logInfo(s"Merge directory and its sub dirs get created at $mergeDir")
+ } catch {
+ case e: IOException =>
+ logError(
+ s"Failed to create $MERGE_MANAGER_DIR dir in $rootDir. Ignoring
this directory.", e)
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a directory that is writable by the group.
+ * Grant the customized permission so the shuffle server can
+ * create subdirs/files within the merge folder.
+ * TODO: Find out why can't we create a dir using java api with permission
770
+ * Files.createDirectories(mergeDir.toPath,
PosixFilePermissions.asFileAttribute(
+ * PosixFilePermissions.fromString("rwxrwx---")))
+ */
+ def createDirWithCustomizedPermission(dirToCreate: File, permission:
String): Unit = {
Review comment:
Updated the method name and revert back to only mkdir with permission
770.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]