This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fdd46f173f7 [SPARK-44306][YARN] Group FileStatus with few RPC calls 
within Yarn Client
1fdd46f173f7 is described below

commit 1fdd46f173f7bc90e0523eb0a2d5e8e27e990102
Author: Shu Wang <swa...@linkedin.com>
AuthorDate: Tue Sep 19 12:50:03 2023 -0500

    [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client
    
    ### What changes were proposed in this pull request?
    
    Instead of obtaining FileStatus for each resource [one by 
one](https://github.com/apache/spark/blob/531ec8bddc8dd22ca39486dbdd31e62e989ddc15/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71C1),
 if we find the most `spark.yarn.jars` comes from the same directories, then we 
can group them together and use a single `listStatus` PRC call to obtain all 
fileStatuses in once.
    1. `directoriesToBePreloaded`: Used to analyze the `spark.yarn.jars`, and 
find out the commonly shared non-local path
    2. `statCachePreload`: Preload the fileStatus for the given directories 
with `listStatus`
    3. For resources specified with glob, we will rely on existing 
implementation using `globStatus` to obtain `fileStatus`
    
    ### Why are the changes needed?
    
    It's inefficient to obtain FileStatus for each resource [one by 
one](https://github.com/apache/spark/blob/531ec8bddc8dd22ca39486dbdd31e62e989ddc15/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71C1).
 In our company setting, we are running Spark with Hadoop Yarn and HDFS. We 
noticed the current behavior has two major drawbacks:
    
    - Since each getFileStatus call involves network delays, the overall delay 
can be large and add uncertainty to the overall Spark job runtime. 
Specifically, we quantify this overhead within our cluster. We see the p50 
overhead is around 10s, p80 is 1 min, and p100 is up to 15 mins. When HDFS is 
overloaded, the delays become more severe.
    
    - In our cluster, we have nearly 100 million getFileStatus call to HDFS 
daily. We noticed that in our cluster, most resources come from the same HDFS 
directory for each user (See our [engineer blog 
post](https://engineering.linkedin.com/blog/2023/reducing-apache-spark-application-dependencies-upload-by-99-)
 about why we took this approach). Therefore, we can greatly reduce nearly 100 
million getFileStatus call to 0.1 million `listStatus` calls daily. This will 
further reduce overhead  [...]
    
    All in all, a more efficient way to fetch the FileStatus for each resource 
is highly needed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    We introduce two configurations so that the user can turn on this feature
    
    1. `spark.yarn.client.statCache.preloaded.enabled`: This configuration 
enables statCache to be preloaded at YARN client side. This feature analyzes 
the pattern of resource paths. If multiple resources shared the same parent 
directory, a single `listStatus` will be invoked on the parent directory 
instead of multiple `getFileStatus` performed on each individual resource. If 
most resources are from the same directory, this feature greatly reduces the 
RPC call, reduces job runtime, and im [...]
    2. `spark.yarn.client.statCache.preloaded.perDirectoryThreshold`: This 
configuration defines the threshold for the number of resources in a directory 
that triggers the activation of the statCache preloading. When the count of 
individual resources specified by `spark.yarn.jars` within a directory is no 
less than this threshold, the statCache preloading for that directory will be 
activated. It's" important to note that this configuration will only take 
effect when the above option is enabled.
    
    ### How was this patch tested?
    
    1. Add two related UTs
    
    Closes #42357 from shuwang21/SPARK-44306-listStatus.
    
    Lead-authored-by: Shu Wang <swa...@linkedin.com>
    Co-authored-by: Shu Wang <wangshu1...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/deploy/yarn/Client.scala      | 78 +++++++++++++++++++---
 .../org/apache/spark/deploy/yarn/config.scala      | 23 +++++++
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 38 ++++++++++-
 3 files changed, 129 insertions(+), 10 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index a675054b4473..deb44c3b5911 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -77,6 +77,9 @@ private[spark] class Client(
   private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
 
   private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && 
!isClusterMode
+  private val statCachePreloadEnabled = 
sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED)
+  private val statCachePreloadDirectoryCountThreshold: Int =
+    sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD)
   private var appMaster: ApplicationMaster = _
   private var stagingDirPath: Path = _
 
@@ -458,6 +461,49 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the 
corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param files : the list of files to upload
+   * @return a hashmap contains directories to be preloaded and all file names 
in that directory
+   */
+  private[yarn] def directoriesToBePreloaded(files: Seq[String]): HashMap[URI, 
HashSet[String]] = {
+    val directoryToFiles = new HashMap[URI, HashSet[String]]()
+    files.foreach { file =>
+      if (!Utils.isLocalUri(file) && !new GlobPattern(file).hasWildcard) {
+        val currentPath = new Path(Utils.resolveURI(file))
+        val parentUri = currentPath.getParent.toUri
+        directoryToFiles.getOrElseUpdate(parentUri, new HashSet[String]()) += 
currentPath.getName
+      }
+    }
+    directoryToFiles.filter(_._2.size >= 
statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that 
directory and add them to the
+   * statCache.
+   *
+   * @param files: the list of files to upload
+   * @param fsLookup: Function for looking up an FS based on a URI; override 
for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(files: Seq[String],
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): 
HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    directoriesToBePreloaded(files).foreach { case (dir: URI, filesInDir: 
HashSet[String]) =>
+      fsLookup(dir).listStatus(new Path(dir), new PathFilter() {
+        override def accept(path: Path): Boolean = 
filesInDir.contains(path.getName)
+      }).filter(_.isFile()).foreach { fileStatus =>
+        val uri = fileStatus.getPath.toUri
+        statCache.put(uri, fileStatus)
+      }
+    }
+    statCache
+  }
+
   /**
    * Upload any resources to the distributed cache if needed. If a resource is 
intended to be
    * consumed locally, set up the appropriate config for downstream code to 
handle it properly.
@@ -485,7 +531,17 @@ private[spark] class Client(
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
 
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    // If preload is enabled, preload the statCache with the files in the 
directories
+    val statCache = if (statCachePreloadEnabled) {
+      // Consider only following configurations, as they involve the 
distribution of multiple files
+      var files = sparkConf.get(SPARK_JARS).getOrElse(Nil) ++ 
sparkConf.get(JARS_TO_DISTRIBUTE) ++
+        sparkConf.get(FILES_TO_DISTRIBUTE) ++ 
sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++
+        sparkConf.get(PY_FILES) ++ pySparkArchives
+
+      getPreloadedStatCache(files)
+    } else {
+      HashMap[URI, FileStatus]()
+    }
     val symlinkCache: Map[URI, Path] = HashMap[URI, Path]()
 
     def addDistributedUri(uri: URI): Boolean = {
@@ -609,14 +665,18 @@ private[spark] class Client(
             if (!Utils.isLocalUri(jar)) {
               val path = getQualifiedLocalPath(Utils.resolveURI(jar), 
hadoopConf)
               val pathFs = FileSystem.get(path.toUri(), hadoopConf)
-              val fss = pathFs.globStatus(path)
-              if (fss == null) {
-                throw new FileNotFoundException(s"Path ${path.toString} does 
not exist")
-              }
-              fss.filter(_.isFile()).foreach { entry =>
-                val uri = entry.getPath().toUri()
-                statCache.update(uri, entry)
-                distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR))
+              if (statCache.contains(path.toUri)) {
+                distribute(path.toUri.toString, targetDir = 
Some(LOCALIZED_LIB_DIR))
+              } else {
+                val fss = pathFs.globStatus(path)
+                if (fss == null) {
+                  throw new FileNotFoundException(s"Path ${path.toString} does 
not exist")
+                }
+                fss.filter(_.isFile()).foreach { entry =>
+                  val uri = entry.getPath().toUri()
+                  statCache.update(uri, entry)
+                  distribute(uri.toString(), targetDir = 
Some(LOCALIZED_LIB_DIR))
+                }
               }
             } else {
               localJars += jar
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index 96ebc03bf163..c11961008019 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -462,6 +462,29 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("Enables statCache to be preloaded at YARN client side. This feature 
analyzes the " +
+      "pattern of resources paths, and if multiple resources shared the same 
parent directory, " +
+      "a single <code>listStatus</code> will be invoked on the parent 
directory instead of " +
+      "multiple <code>getFileStatus</code> on individual resources. If most 
resources are from " +
+      "a small set of directories, this can improve job submission time. 
Enabling this feature " +
+      "may potentially increase client memory overhead.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("Minimum resource count in a directory to trigger statCache 
preloading when " +
+        "submitting an application. If the number of resources in a directory, 
without " +
+        "any wildcards, equals or exceeds this threshold, the statCache for 
that directory " +
+        "will be preloaded. This configuration will only take effect when " +
+        "<code>spark.yarn.client.statCache.preloaded.enabled</code> option is 
enabled.")
+      .version("4.0.0")
+      .intConf
+      .createWithDefault(5)
+
   private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = 
"spark.yarn.executor.resource."
   private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = 
"spark.yarn.driver.resource."
   private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource."
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 8802c59e78b6..ce4389fd268d 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap => MutableHashMap}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords.{GetNewApplicationResponse, 
SubmitApplicationRequest}
@@ -670,6 +670,42 @@ class ClientSuite extends SparkFunSuite
     assertUserClasspathUrls(cluster = true, replacementRootPath)
   }
 
+  test("SPARK-44306: test directoriesToBePreloaded") {
+    val sparkConf = new SparkConf()
+      .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 3)
+    val client = createClient(sparkConf, args = Array("--jar", USER))
+    val directories = client.directoriesToBePreloaded(Seq(
+      "hdfs:/valid/a.jar",
+      "hdfs:/valid/b.jar",
+      "hdfs:/valid/c.jar",
+      "s3:/valid/a.jar",
+      "s3:/valid/b.jar",
+      "s3:/valid/c.jar",
+      "hdfs:/glob/*",
+      "hdfs:/fewer/a.jar",
+      "hdfs:/fewer/b.jar",
+      "local:/local/a.jar",
+      "local:/local/b.jar",
+      "local:/local/c.jar"))
+    assert(directories.size == 2 && directories.contains(new 
URI("hdfs:/valid"))
+      && directories.contains(new URI("s3:/valid")))
+  }
+
+  test("SPARK-44306: test statCachePreload") {
+    val sparkConf = new SparkConf()
+      .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 2)
+      .set(JARS_TO_DISTRIBUTE, Seq("hdfs:/valid/a.jar", 
"hdfs:/valid/sub/../b.jar"))
+    val client = createClient(sparkConf, args = Array("--jar", USER))
+    val mockFileSystem = mock(classOf[FileSystem])
+    val mockFsLookup: URI => FileSystem = _ => mockFileSystem
+    when(mockFileSystem.listStatus(any[Path], any[PathFilter])).thenReturn(Seq(
+      new FileStatus(1, false, 1, 1, 1L, new Path("hdfs:/valid/a.jar")),
+      new FileStatus(1, false, 1, 1, 1L, new Path("hdfs:/valid/b.jar")),
+      new FileStatus(1, true, 1, 1, 1L, new Path("hdfs:/valid/c"))).toArray)
+    // Expect only a.jar and b.jar to be preloaded
+    assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE), 
mockFsLookup).size === 2)
+  }
+
   private val matching = Seq(
     ("files URI match test1", "file:///file1", "file:///file2"),
     ("files URI match test2", "file:///c:file1", "file://c:file2"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to