otterc commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r642027998



##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,27 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Get the local merged shuffle block data for the given block ID as 
multiple chunks.
+   * A merged shuffle file is divided into multiple chunks according to the 
index file.
+   * Instead of reading the entire file as a single block, we split it into 
smaller chunks
+   * which will be memory efficient when performing certain operations.
+   */
+  def getLocalMergedBlockData(

Review comment:
       Can we keep this name consistent, that is, `getMergedBlockData` with 
what is in the `ShuffleBlockResolver` 

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -149,6 +178,12 @@ object BlockId {
       ShuffleIndexBlockId(shuffleId.toInt, mapId.toLong, reduceId.toInt)
     case SHUFFLE_PUSH(shuffleId, mapIndex, reduceId) =>
       ShufflePushBlockId(shuffleId.toInt, mapIndex.toInt, reduceId.toInt)
+    case SHUFFLE_MERGED(appId, shuffleId, reduceId) =>

Review comment:
       Nit: Shall we make this consistent and call it `SHUFFLE_MERGED_DATA`?

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +370,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getMergedBlockData(

Review comment:
       Nit: Missing javadoc
   ```
     /**
      * This is only used for reading local merged block data. In such cases, 
all chunks in the
      * merged shuffle file need to be identified at once, so the 
ShuffleBlockFetcherIterator
      * knows how to consume local merged shuffle file as multiple chunks.
      */
      ```

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -504,7 +504,8 @@ private[spark] class BlockManager(
 
     hostLocalDirManager = {
       if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
-          !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+          !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) ||
+          Utils.isPushBasedShuffleEnabled(conf)) {

Review comment:
       When pushBasedShuffle is enabled, we want the hostLocalDirManager to be 
initialized. push-based shuffle works with old fetch protocol as well. Should 
this be:
   ```
   (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
             !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) )||
             Utils.isPushBasedShuffleEnabled(conf)
    ```         
    If yes, please add a UT as well to check if `hostLocalDirManager` is 
initialized when just push-based shuffle is enabled. I think we have this UT 
internally.

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +198,60 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): 
Array[File] = {

Review comment:
       In our latest version it doesn't return anything.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -149,6 +178,12 @@ object BlockId {
       ShuffleIndexBlockId(shuffleId.toInt, mapId.toLong, reduceId.toInt)
     case SHUFFLE_PUSH(shuffleId, mapIndex, reduceId) =>
       ShufflePushBlockId(shuffleId.toInt, mapIndex.toInt, reduceId.toInt)
+    case SHUFFLE_MERGED(appId, shuffleId, reduceId) =>
+      ShuffleMergedBlockId(appId, shuffleId.toInt, reduceId.toInt)
+    case SHUFFLE_MERGED_INDEX(appId, shuffleId, reduceId) =>
+      ShuffleMergedIndexBlockId(appId, shuffleId.toInt, reduceId.toInt)
+    case SHUFFLE_MERGED_META(appId, shuffleId, reduceId) =>
+      ShuffleMergedMetaBlockId(appId, shuffleId.toInt, reduceId.toInt)

Review comment:
       Same here. Are we using this anywhere?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockId.scala
##########
@@ -129,6 +155,9 @@ object BlockId {
   val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
   val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
   val SHUFFLE_PUSH = "shufflePush_([0-9]+)_([0-9]+)_([0-9]+)".r
+  val SHUFFLE_MERGED = "shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).data".r
+  val SHUFFLE_MERGED_INDEX = 
"shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).index".r
+  val SHUFFLE_MERGED_META = 
"shuffleMerged_([_A-Za-z0-9]*)_([0-9]+)_([0-9]+).meta".r

Review comment:
       Where are we using this?

##########
File path: 
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -85,6 +86,39 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     assert(diskBlockManager.getAllBlocks().isEmpty)
   }
 
+  test("find active merged shuffle directories") {

Review comment:
       This is a stale UT. It's not needed

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,38 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant the customized permission so the shuffle server can
+   * create subdirs/files within the merge folder.
+   */
+  def createDirWithCustomizedPermission(dirToCreate: File, permission: 
String): Unit = {
+    var attempts = 0
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+    var created: File = null
+    while (created == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException(
+          s"Failed to create directory ${dirToCreate.getAbsolutePath} after " +
+            s"${maxAttempts} attempts!")
+      }
+      try {
+        val builder = new ProcessBuilder().command(
+          "mkdir", "-p", "-m770", dirToCreate.getAbsolutePath)

Review comment:
       `permission` variable is not being used. 
   Also we should still keep the TODO here 
   ```
     * TODO: Find out why can't we create a dir using java api with permission 
770
      *  Files.createDirectories(mergeDir.toPath, 
PosixFilePermissions.asFileAttribute(
      *  PosixFilePermissions.fromString("rwxrwx---"))
    
   ```
   Also I think we have generalized this createDir method but for more 
restrictive permissions, the java api would work.
   

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +198,60 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): 
Array[File] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist 
under any local dir.
+      for (rootDir <- configuredLocalDirs) {
+        val mergeDir = new File(rootDir, mergeDirName)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          return Array.empty[File]
+        }
+      }
+      for (rootDir <- configuredLocalDirs) {
+        val mergeDir = new File(rootDir, mergeDirName)
+        if (!mergeDir.exists()) {
+          logDebug(s"Creating $mergeDir as it does not exist")
+          // This executor didn't see merge_manager in the local dir, it will 
start creating them.
+          // It's possible that the other executors launched at the same time 
may also reach here
+          // but we are working on the assumption that the executors launched 
around the same time
+          // will have the same set of application local directories.
+          try {
+            for (dirNum <- 0 until subDirsPerLocalDir) {
+              // Only one container will create this directory. The filesystem 
will handle any race
+              // conditions.
+              val sudDir = new File(mergeDir, "%02x".format(dirNum))

Review comment:
       Should we not check the existence of the subdir before trying to create 
it?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +736,27 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Get the local merged shuffle block data for the given block ID as 
multiple chunks.
+   * A merged shuffle file is divided into multiple chunks according to the 
index file.
+   * Instead of reading the entire file as a single block, we split it into 
smaller chunks
+   * which will be memory efficient when performing certain operations.
+   */
+  def getLocalMergedBlockData(
+      blockId: ShuffleBlockId,
+      dirs: Array[String]): Seq[ManagedBuffer] = {
+    shuffleManager.shuffleBlockResolver.getMergedBlockData(blockId, Some(dirs))
+  }
+
+  /**
+   * Get the local merged shuffle block meta data for the given block ID.
+   */
+  def geLocalMergedBlockMeta(

Review comment:
       Same here, `getMergedBlockMeta`?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +198,60 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): 
Array[File] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist 
under any local dir.
+      for (rootDir <- configuredLocalDirs) {
+        val mergeDir = new File(rootDir, mergeDirName)
+        if (mergeDir.exists()) {
+          logDebug(s"Not creating $mergeDir as it already exists")
+          return Array.empty[File]
+        }
+      }
+      for (rootDir <- configuredLocalDirs) {
+        val mergeDir = new File(rootDir, mergeDirName)
+        if (!mergeDir.exists()) {
+          logDebug(s"Creating $mergeDir as it does not exist")

Review comment:
       logDebug(s"Creating $mergeDir and its subdirs since the merge dir does 
not exist")

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -52,6 +59,17 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
+  // Get merge directory name, append attemptId if there is any
+  private val mergeDirName =
+    conf.getOption("spark.app.attempt.id")
+      .map(id => MERGE_DIRECTORY + "_" + id).getOrElse(MERGE_DIRECTORY)
+
+  /**
+   * Create merge directories
+   */
+  private[spark] val activeMergedShuffleDirs: Array[File] =
+    createLocalDirsForMergedShuffleBlocks(conf)

Review comment:
       Can you please check the signature of this method again? We don't need 
to keep the dirs in `DiskBlockManager` instance memory. We don't even use the 
`activeMergedShuffleDirs` anywhere in this class. This still looks like stale 
code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to