mridulm commented on a change in pull request #32007:
URL: https://github.com/apache/spark/pull/32007#discussion_r644956403



##########
File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
##########
@@ -211,63 +214,74 @@ public ManagedBuffer getMergedBlockData(String appId, int 
shuffleId, int reduceI
 
   /**
    * The logic here is consistent with
-   * org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile
+   * @see 
[[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(org.apache.spark.storage.BlockId)]]
    */
   private File getFile(String appId, String filename) {

Review comment:
       Resolving this given we are moving multiple attempt support to a 
subsequent jira

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +359,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all 
chunks in the
+   * merged shuffle file need to be identified at once, so the 
ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] 
= {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
+    val size = indexFile.length.toInt
+    val buffer = ByteBuffer.allocate(size)
+    val offsets = buffer.asLongBuffer
+    val dis = new DataInputStream(Files.newInputStream(indexFile.toPath))
+    try {
+      dis.readFully(buffer.array)
+    } finally {
+      dis.close()
+    }

Review comment:
       On second thought, given `Utils` is not available, there might not be an 
alternative location for this.
   We might need to look at a common infra module for utilities like this which 
all modules can depend on ... thoughts @Ngone51 ?

##########
File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -343,6 +359,50 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  /**
+   * This is only used for reading local merged block data. In such cases, all 
chunks in the
+   * merged shuffle file need to be identified at once, so the 
ShuffleBlockFetcherIterator
+   * knows how to consume local merged shuffle file as multiple chunks.
+   */
+  override def getMergedBlockData(blockId: ShuffleBlockId): Seq[ManagedBuffer] 
= {
+    val indexFile = getMergedBlockIndexFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    val dataFile = getMergedBlockDataFile(conf.getAppId, blockId.shuffleId, 
blockId.reduceId)
+    // Load all the indexes in order to identify all chunks in the specified 
merged shuffle file.
+    val size = indexFile.length.toInt
+    val buffer = ByteBuffer.allocate(size)
+    val offsets = buffer.asLongBuffer
+    val dis = new DataInputStream(Files.newInputStream(indexFile.toPath))
+    try {
+      dis.readFully(buffer.array)
+    } finally {
+      dis.close()
+    }

Review comment:
       On second thought, given `Utils` is not available, there might not be an 
alternative location for this.
   We might need to look at a common infra module for utilities like this which 
all modules can depend on (or is there something already ?) ... thoughts 
@Ngone51 ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -504,7 +504,8 @@ private[spark] class BlockManager(
 
     hostLocalDirManager = {
       if (conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) &&
-          !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) {
+          !conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) ||
+          Utils.isPushBasedShuffleEnabled(conf)) {

Review comment:
       I agree with @otterc's comment - though I think the suggested change in 
the comment does not do what you want it to do Chandni ? :-)

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +88,33 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * @see [[org.apache.spark.network.shuffle.RemoteBlockPushResolver#getFile(
+   *     java.lang.String, java.lang.String)]]
+   */
+  def getMergedShuffleFile(blockId: BlockId, dirs: Option[Array[String]]): 
File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.name, dirs)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.name, dirs)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.name, dirs)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Only merged block ID is supported, but got $blockId")
+    }
+  }
+
+  private def getMergedShuffleFile(filename: String, dirs: 
Option[Array[String]]): File = {
+    if (dirs.isEmpty) {

Review comment:
       ```suggestion
       if (dirs.map(_.nonEmpty).getOrElse(false)) {
   ```

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +88,33 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * @see [[org.apache.spark.network.shuffle.RemoteBlockPushResolver#getFile(
+   *     java.lang.String, java.lang.String)]]
+   */
+  def getMergedShuffleFile(blockId: BlockId, dirs: Option[Array[String]]): 
File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.name, dirs)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.name, dirs)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.name, dirs)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Only merged block ID is supported, but got $blockId")
+    }
+  }
+
+  private def getMergedShuffleFile(filename: String, dirs: 
Option[Array[String]]): File = {
+    if (dirs.isEmpty) {

Review comment:
       ```suggestion
       if (dirs.exists(_.nonEmpty)) {
   ```

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
##########
@@ -83,6 +88,33 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /**
+   * This should be in sync with
+   * @see [[org.apache.spark.network.shuffle.RemoteBlockPushResolver#getFile(
+   *     java.lang.String, java.lang.String)]]
+   */
+  def getMergedShuffleFile(blockId: BlockId, dirs: Option[Array[String]]): 
File = {
+    blockId match {
+      case mergedBlockId: ShuffleMergedBlockId =>
+        getMergedShuffleFile(mergedBlockId.name, dirs)
+      case mergedIndexBlockId: ShuffleMergedIndexBlockId =>
+        getMergedShuffleFile(mergedIndexBlockId.name, dirs)
+      case mergedMetaBlockId: ShuffleMergedMetaBlockId =>
+        getMergedShuffleFile(mergedMetaBlockId.name, dirs)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Only merged block ID is supported, but got $blockId")
+    }
+  }
+
+  private def getMergedShuffleFile(filename: String, dirs: 
Option[Array[String]]): File = {
+    if (dirs.isEmpty) {

Review comment:
       ```suggestion
       if (!dirs.exists(_.nonEmpty)) {
   ```

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2566,11 +2601,28 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Push based shuffle can only be enabled when external shuffle service is 
enabled.
+   * Push based shuffle can only be enabled when the application is submitted
+   * to run in YARN mode, with external shuffle service enabled and
+   * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to 
1.
+   * TODO: SPARK-35546 Support push based shuffle with multiple app attempts
    */
   def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
     conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
-      (conf.get(IS_TESTING).getOrElse(false) || 
conf.get(SHUFFLE_SERVICE_ENABLED))
+      (conf.get(IS_TESTING).getOrElse(false) ||
+        (conf.get(SHUFFLE_SERVICE_ENABLED) &&
+          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn") &&
+          getYarnMaxAttempts(conf) == 1)
+  }
+
+  /** Returns the maximum number of attempts to register the AM in YARN mode. 
*/
+  def getYarnMaxAttempts(conf: SparkConf): Int = {
+      val sparkMaxAttempts = 
conf.getOption("spark.yarn.maxAttempts").map(_.toInt)
+      val yarnMaxAttempts = getSparkOrYarnConfig(conf, 
YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS.toString).toInt
+      sparkMaxAttempts match {
+        case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
+        case None => yarnMaxAttempts
+      }

Review comment:
       This already exists in spark right ? If yes, remove duplication ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to