otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r644834562



##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,41 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant the customized permission so the shuffle server can
+   * create subdirs/files within the merge folder.

Review comment:
       @Ngone51 @zhouyejoe How about moving this function to DiskBlockManager 
completely? For more restrictive permissions than `750` we can  use java api. 
Here, we are trying to run the command `mkdir` just because we want to create 
mergeDirectory with `770` permission. 

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2566,11 +2601,28 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Push based shuffle can only be enabled when external shuffle service is 
enabled.
+   * Push based shuffle can only be enabled when the application is submitted
+   * to run in YARN mode, with external shuffle service enabled and
+   * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to 
1.
+   * TODO: SPARK-35546 Support push based shuffle with multiple app attempts
    */
   def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
     conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
-      (conf.get(IS_TESTING).getOrElse(false) || 
conf.get(SHUFFLE_SERVICE_ENABLED))
+      (conf.get(IS_TESTING).getOrElse(false) ||
+        (conf.get(SHUFFLE_SERVICE_ENABLED) &&
+          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") &&

Review comment:
       Nit: is the String "yarn" declared as a constant anywhere else that we 
can use?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -504,7 +504,8 @@ private[spark] class BlockManager(
 
     hostLocalDirManager = {
       if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
-          !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+          !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) ||
+          Utils.isPushBasedShuffleEnabled(conf)) {

Review comment:
       This still looks the same @zhouyejoe. `hostLocalDirManager` should be 
set when push-based shuffle is enable. Can you put `()` around  
`conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
             !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)` so that it is 
clear.
   It should be:
   ```
   if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
             !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) ||
             Utils.isPushBasedShuffleEnabled(conf))
    ```

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -87,6 +87,32 @@ case class ShufflePushBlockId(shuffleId: Int, mapIndex: Int, 
reduceId: Int) exte
   override def name: String = "shufflePush_" + shuffleId + "_" + mapIndex + 
"_" + reduceId
 }
 
+@Since("3.2.0")
+@DeveloperApi
+case class ShuffleMergedBlockId(appId: String, shuffleId: Int, reduceId: Int) 
extends BlockId {

Review comment:
       nit: Change it to `ShuffleMergedDataBlockId`

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,45 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist 
under any local dir.
+      Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
+        try {
+          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          // This executor does not find merge_manager directory, it will 
start creating them.
+          // It's possible that the other executors launched at the same time 
may also reach here
+          // but we are working on the assumption that the executors launched 
around the same time
+          // will have the same set of application local directories.

Review comment:
       This comment needs to be updated as well. We are no longer working with 
this assumption.
   `but we are working on the assumption that the executors launched around the 
same time
             // will have the same set of application local directories.`
    Also move it inside the `if` block below.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,27 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Get the local merged shuffle block data for the given block ID as 
multiple chunks.
+   * A merged shuffle file is divided into multiple chunks according to the 
index file.
+   * Instead of reading the entire file as a single block, we split it into 
smaller chunks
+   * which will be memory efficient when performing certain operations.
+   */
+  def getLocalMergedBlockData(

Review comment:
       ok

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -139,7 +171,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
    * located inside configured local directories and won't
    * be deleted on JVM exit when using the external shuffle service.
    */
-  private def createLocalDirs(conf: SparkConf): Array[File] = {
+  private def createLocalDirs(): Array[File] = {

Review comment:
       Nit: is this change really required? This is original code so why not 
just leave it as it is?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -40,7 +42,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   /* Create one local directory for each path mentioned in spark.local.dir; 
then, inside this
    * directory, create multiple subdirectories that we will hash files into, 
in order to avoid
    * having really large inodes at the top level. */
-  private[spark] val localDirs: Array[File] = createLocalDirs(conf)
+  private[spark] val localDirs: Array[File] = createLocalDirs()

Review comment:
       Nit: this change seems unrelated to push-based shuffle so should we not 
make it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to