Ngone51 commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r644689361



##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,45 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {

Review comment:
       nit: "createMergeManagerDirIfNeeded()" ?
   
   btw: `conf` can be omitted as it's a member field.

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,45 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to

Review comment:
       nit: `shuffle service` -> `external shuffle service`

##########
File path: 
core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
##########
@@ -85,6 +86,24 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     assert(diskBlockManager.getAllBlocks().isEmpty)
   }
 
+  test("should still create merge directories if one already exists under a 
local dir") {
+    val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_MANAGER_DIR)
+    if (!mergeDir0.exists()) {
+      Files.createDirectories(mergeDir0.toPath)
+    }
+    val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_MANAGER_DIR)
+    if (mergeDir1.exists()) {
+      Utils.deleteRecursively(mergeDir1)
+    }
+    testConf.set("spark.local.dir", rootDirs)
+    testConf.set("spark.shuffle.push.enabled", "true")
+    testConf.set("spark.shuffle.service.enabled", "true")
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+    Utils.getConfiguredLocalDirs(testConf).map(
+      rootDir => new File(rootDir, DiskBlockManager.MERGE_MANAGER_DIR))
+      .filter(mergeDir => mergeDir.exists())

Review comment:
       Add assertions?

##########
File path: 
core/src/test/scala/org/apache/spark/shuffle/HostLocalShuffleReadingSuite.scala
##########
@@ -133,4 +133,13 @@ class HostLocalShuffleReadingSuite extends SparkFunSuite 
with Matchers with Loca
       assert(remoteBytesRead.sum === 0 && remoteBlocksFetched.sum === 0)
     }
   }
+
+  test("Enable host local shuffle reading when Push based shuffle is enabled") 
{
+    val conf = new SparkConf()
+      .set(SHUFFLE_SERVICE_ENABLED, true)
+      .set("spark.yarn.maxAttempts", "1")
+      .set(PUSH_BASED_SHUFFLE_ENABLED, true)
+    sc = new SparkContext("local-cluster[2,1,1024]", 
"test-host-local-shuffle-reading", conf)

Review comment:
       nit: `local-cluster[2, 1, 1024]` (extra space after the comma)

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +88,33 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * @see [[org.apache.spark.network.shuffle.RemoteBlockPushResolver#getFile(
+   *     java.lang.String, java.lang.String)]]
+   */
+  def getMergedShuffleFile(blockId: BlockId, dirs: Option[Array[String]]): 
File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.name, dirs)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.name, dirs)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.name, dirs)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Only merged block ID is supported, but got $blockId")
+    }
+  }
+
+  private def getMergedShuffleFile(filename: String, dirs: 
Option[Array[String]]): File = {
+    if (dirs.isEmpty) {

Review comment:
       IIUC, I think `dirs.get.isEmpty` is what you really need here.

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +370,51 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  override def getMergedBlockData(
+      blockId: ShuffleBlockId,
+      dirs: Option[Array[String]]): Seq[ManagedBuffer] = {
+    val indexFile =
+      getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId, dirs)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId, dirs)
+    // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
+    val size = indexFile.length.toInt
+    val buffer = ByteBuffer.allocate(size)
+    val offsets = buffer.asLongBuffer
+    val dis = new DataInputStream(Files.newInputStream(indexFile.toPath))
+    try {
+      dis.readFully(buffer.array)
+    } finally {
+      dis.close()
+    }
+    // Number of chunks is number of indexes - 1
+    val numChunks = size / 8 - 1
+    for (index <- 0 until numChunks) yield {
+      new FileSegmentManagedBuffer(transportConf, dataFile,
+        offsets.get(index),
+        offsets.get(index + 1) - offsets.get(index))
+    }
+  }
+
+  /**
+   * This is only used for reading local merged block meta data.
+   */
+  override def getMergedBlockMeta(
+      blockId: ShuffleBlockId,
+      dirs: Option[Array[String]]): MergedBlockMeta = {
+    val indexFile =
+      getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId, dirs)
+    val size = indexFile.length.toInt
+    val numChunks = (size / 8) - 1
+    val metaFile = getMergedBlockMetaFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId, dirs)
+    val chunkBitMaps = new FileSegmentManagedBuffer(transportConf, metaFile, 
0L, metaFile.length)
+    new MergedBlockMeta(numChunks, chunkBitMaps)
+  }
+
+  /**
+   * This is only used for reading local merged block data. In such cases, all 
chunks in the
+   * merged shuffle file need to be identified at once, so the 
ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */

Review comment:
       Does this comment belong to `getMergedBlockData`?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +88,33 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * @see [[org.apache.spark.network.shuffle.RemoteBlockPushResolver#getFile(
+   *     java.lang.String, java.lang.String)]]
+   */
+  def getMergedShuffleFile(blockId: BlockId, dirs: Option[Array[String]]): 
File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.name, dirs)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.name, dirs)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.name, dirs)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Only merged block ID is supported, but got $blockId")
+    }
+  }
+
+  private def getMergedShuffleFile(filename: String, dirs: 
Option[Array[String]]): File = {
+    if (dirs.isEmpty) {

Review comment:
       Why not use `dirs: Option[Array[String]]` directly?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,41 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant the customized permission so the shuffle server can
+   * create subdirs/files within the merge folder.
+   * TODO: Find out why can't we create a dir using java api with permission 
770
+   *  Files.createDirectories(mergeDir.toPath, 
PosixFilePermissions.asFileAttribute(
+   *  PosixFilePermissions.fromString("rwxrwx---")))
+   */
+  def createDirWithCustomizedPermission(dirToCreate: File, permission: 
String): Unit = {
+    var attempts = 0
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+    var created: File = null
+    while (created == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException(
+          s"Failed to create directory ${dirToCreate.getAbsolutePath} after " +
+            s"${maxAttempts} attempts!")
+      }
+      try {
+        val builder = new ProcessBuilder().command(
+          "mkdir", "-p", "-m" + permission, dirToCreate.getAbsolutePath)
+        val proc = builder.start()
+        val exitCode = proc.waitFor()
+        if (dirToCreate.exists()) {
+          created = dirToCreate
+        }
+        logDebug(
+          s"Created directory at ${dirToCreate.getAbsolutePath} and exitCode 
$exitCode")
+      } catch {
+        case e: SecurityException => created = null;

Review comment:
       Add a warning log with the exception?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,41 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant the customized permission so the shuffle server can
+   * create subdirs/files within the merge folder.

Review comment:
       I think this comment is not appropriate for a util function. Shall we 
move this to the caller side in `DiskBlockManager`?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -728,6 +729,27 @@ private[spark] class BlockManager(
     }
   }
 
+  /**
+   * Get the local merged shuffle block data for the given block ID as 
multiple chunks.
+   * A merged shuffle file is divided into multiple chunks according to the 
index file.
+   * Instead of reading the entire file as a single block, we split it into 
smaller chunks
+   * which will be memory efficient when performing certain operations.
+   */
+  def getLocalMergedBlockData(
+      blockId: ShuffleBlockId,
+      dirs: Array[String]): Seq[ManagedBuffer] = {
+    shuffleManager.shuffleBlockResolver.getMergedBlockData(blockId, Some(dirs))
+  }
+
+  /**
+   * Get the local merged shuffle block meta data for the given block ID.
+   */
+  def getLocalMergedBlockMeta(
+      blockId: ShuffleBlockId,
+      dirs: Array[String]): MergedBlockMeta = {
+    shuffleManager.shuffleBlockResolver.getMergedBlockMeta(blockId, Some(dirs))

Review comment:
       How do you plan test `getLocalMergedBlockData` and 
`getLocalMergedBlockMeta`?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +185,45 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): Unit = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist 
under any local dir.
+      Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
+        try {
+          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          // This executor does not find merge_manager directory, it will 
start creating them.
+          // It's possible that the other executors launched at the same time 
may also reach here
+          // but we are working on the assumption that the executors launched 
around the same time
+          // will have the same set of application local directories.
+          if (!mergeDir.exists()) {
+            logDebug(
+              s"Try to create $mergeDir and its sub dirs since the merge dir 
does not exist")

Review comment:
       nit: `merge` -> `merge_manager`?

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
##########
@@ -35,7 +35,11 @@
   public final String[] localDirs;
   /** Number of subdirectories created within each localDir. */
   public final int subDirsPerLocalDir;
-  /** Shuffle manager (SortShuffleManager) that the executor is using. */
+  /** Shuffle manager (SortShuffleManager) that the executor is using.

Review comment:
       ```suggestion
     /** 
      * Shuffle manager (SortShuffleManager) that the executor is using.
   ```

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,41 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant the customized permission so the shuffle server can
+   * create subdirs/files within the merge folder.
+   * TODO: Find out why can't we create a dir using java api with permission 
770
+   *  Files.createDirectories(mergeDir.toPath, 
PosixFilePermissions.asFileAttribute(
+   *  PosixFilePermissions.fromString("rwxrwx---")))
+   */
+  def createDirWithCustomizedPermission(dirToCreate: File, permission: 
String): Unit = {
+    var attempts = 0
+    val maxAttempts = MAX_DIR_CREATION_ATTEMPTS
+    var created: File = null
+    while (created == null) {
+      attempts += 1
+      if (attempts > maxAttempts) {
+        throw new IOException(
+          s"Failed to create directory ${dirToCreate.getAbsolutePath} after " +

Review comment:
       nit: `Failed to create directory ${dirToCreate.getAbsolutePath} with 
$permission...`?

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2566,11 +2601,28 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Push based shuffle can only be enabled when external shuffle service is 
enabled.
+   * Push based shuffle can only be enabled when the application is submitted
+   * to run in YARN mode, with external shuffle service enabled and
+   * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to 
1.
+   * TODO: SPARK-35546 Support push based shuffle with multiple app attempts
    */
   def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
     conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
-      (conf.get(IS_TESTING).getOrElse(false) || 
conf.get(SHUFFLE_SERVICE_ENABLED))
+      (conf.get(IS_TESTING).getOrElse(false) ||
+        (conf.get(SHUFFLE_SERVICE_ENABLED) &&
+          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") &&
+          getYarnMaxAttempts(conf) == 1)
+  }
+
+  /** Returns the maximum number of attempts to register the AM in YARN mode. 
*/
+  def getYarnMaxAttempts(conf: SparkConf): Int = {
+      val sparkMaxAttempts = 
conf.getOption("spark.yarn.maxAttempts").map(_.toInt)

Review comment:
       nit: 2 indents

##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -431,7 +432,7 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
       executorInfo.subDirsPerLocalDir));
   }
   private static String generateFileName(AppShuffleId appShuffleId, int 
reduceId) {
-    return String.format("mergedShuffle_%s_%d_%d", appShuffleId.appId, 
appShuffleId.shuffleId,
+    return String.format("shuffleMerged_%s_%d_%d", appShuffleId.appId, 
appShuffleId.shuffleId,

Review comment:
       Shall we make this prefix a constant field?

##########
File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
##########
@@ -1438,16 +1441,34 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     assert(message.contains(expected))
   }
 
-  test("isPushBasedShuffleEnabled when both PUSH_BASED_SHUFFLE_ENABLED" +
-    " and SHUFFLE_SERVICE_ENABLED are true") {
+  test("isPushBasedShuffleEnabled when PUSH_BASED_SHUFFLE_ENABLED " +
+    "and SHUFFLE_SERVICE_ENABLED are both set to true in YARN mode with 
maxAttempts set to 1") {
     val conf = new SparkConf()
     assert(Utils.isPushBasedShuffleEnabled(conf) === false)
     conf.set(PUSH_BASED_SHUFFLE_ENABLED, true)
     conf.set(IS_TESTING, false)
     assert(Utils.isPushBasedShuffleEnabled(conf) === false)
     conf.set(SHUFFLE_SERVICE_ENABLED, true)
+    conf.set(SparkLauncher.SPARK_MASTER, "yarn")
+    conf.set("spark.yarn.maxAttempts", "1")
     assert(Utils.isPushBasedShuffleEnabled(conf) === true)
+    conf.set("spark.yarn.maxAttempts", "2")
+    assert(Utils.isPushBasedShuffleEnabled(conf) === false)
+  }
+
+  test("Test create dir with 770") {
+    val testDir = new File("target/testDir");
+    FileUtils.deleteQuietly(testDir)
+    Utils.createDirWithCustomizedPermission(testDir, "770")
+    val permission = PosixFilePermissions.toString(
+      JavaFiles.getPosixFilePermissions(Paths.get("target/testDir")))
+    assert(permission.equals("rwxrwx---"))
+    val foo = new File(testDir, "foo.txt")
+    Files.touch(foo)
+    assert(testDir.exists && testDir.isDirectory)
+    FileUtils.deleteQuietly(testDir)
   }
+

Review comment:
       Revert this?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -153,6 +204,60 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
     }
   }
 
+  /**
+   * Get the list of configured local dirs storing merged shuffle blocks 
created by executors
+   * if push based shuffle is enabled. Note that the files in this directory 
will be created
+   * by the external shuffle services. We only create the merge_manager 
directories and
+   * subdirectories here because currently the shuffle service doesn't have 
permission to
+   * create directories under application local directories.
+   */
+  private def createLocalDirsForMergedShuffleBlocks(conf: SparkConf): 
Array[File] = {
+    if (Utils.isPushBasedShuffleEnabled(conf)) {
+      // Will create the merge_manager directory only if it doesn't exist 
under any local dir.

Review comment:
       Have you addressed this comment?
   
   Looking at the code, it still looks inconsistent with the comment. I assume 
the comment should look like "Will create the merge_manager directory if it 
doesn't exist under the local dir." to match the code.
   
   

##########
File path: core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
##########
@@ -17,12 +17,13 @@
 
 package org.apache.spark.util
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, 
DataOutputStream, File,
-  FileOutputStream, PrintStream, SequenceInputStream}
+import java.io._

Review comment:
       I don't remember I recommended ever...but yes it's recommended to use 
wildcard imports when [there're more than  6 
entities](https://github.com/databricks/scala-style-guide#imports).
   
   

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +359,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all 
chunks in the
+   * merged shuffle file need to be identified at once, so the 
ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] 
= {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
+    val size = indexFile.length.toInt
+    val buffer = ByteBuffer.allocate(size)
+    val offsets = buffer.asLongBuffer
+    val dis = new DataInputStream(Files.newInputStream(indexFile.toPath))
+    try {
+      dis.readFully(buffer.array)
+    } finally {
+      dis.close()
+    }

Review comment:
       This is not resolved? @zhouyejoe 

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -315,6 +315,41 @@ private[spark] object Utils extends Logging {
     dir.getCanonicalFile
   }
 
+  /**
+   * Create a directory that is writable by the group.
+   * Grant the customized permission so the shuffle server can
+   * create subdirs/files within the merge folder.

Review comment:
       I'm fine with it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to