zhouyejoe commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r648748667



##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,82 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the external shuffle service 
doesn't have
+   * permission to create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist 
under the local dir.
+      Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
+        try {
+          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          if (!mergeDir.exists()) {
+            // This executor does not find merge_manager directory, it will 
try to create
+            // the merge_manager directory and the sub directories.
+            logDebug(s"Try to create $mergeDir and its sub dirs since the " +
+              s"$MERGE_MANAGER_DIR dir does not exist")
+            for (dirNum <- 0 until subDirsPerLocalDir) {
+              val subDir = new File(mergeDir, "%02x".format(dirNum))
+              if (!subDir.exists()) {
+                // Only one container will create this directory. The 
filesystem will handle
+                // any race conditions.
+                createDirWithCustomizedPermission(subDir, "770")
+              }
+            }
+          }
+          logInfo(s"Merge directory and its sub dirs get created at $mergeDir")
+        } catch {
+          case e: IOException =>
+            logError(
+              s"Failed to create $MERGE_MANAGER_DIR dir in $rootDir. Ignoring 
this directory.", e)
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a directory that is writable by the group.
+   * Grant the customized permission so the shuffle server can
+   * create subdirs/files within the merge folder.
+   * TODO: Find out why can't we create a dir using java api with permission 
770
+   *  Files.createDirectories(mergeDir.toPath, 
PosixFilePermissions.asFileAttribute(
+   *  PosixFilePermissions.fromString("rwxrwx---")))
+   */
+  def createDirWithCustomizedPermission(dirToCreate: File, permission: 
String): Unit = {

Review comment:
       @Ngone51 Thought on this? I think this is a valid point. If not due to 
umask in Yarn, we don't have to use "mkdir" to create the dir, but with the 
PosixFilePermissions and Files.createDirectory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to