This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1fdd46f173f7 [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client 1fdd46f173f7 is described below commit 1fdd46f173f7bc90e0523eb0a2d5e8e27e990102 Author: Shu Wang <swa...@linkedin.com> AuthorDate: Tue Sep 19 12:50:03 2023 -0500 [SPARK-44306][YARN] Group FileStatus with few RPC calls within Yarn Client ### What changes were proposed in this pull request? Instead of obtaining FileStatus for each resource [one by one](https://github.com/apache/spark/blob/531ec8bddc8dd22ca39486dbdd31e62e989ddc15/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71C1), if we find the most `spark.yarn.jars` comes from the same directories, then we can group them together and use a single `listStatus` PRC call to obtain all fileStatuses in once. 1. `directoriesToBePreloaded`: Used to analyze the `spark.yarn.jars`, and find out the commonly shared non-local path 2. `statCachePreload`: Preload the fileStatus for the given directories with `listStatus` 3. For resources specified with glob, we will rely on existing implementation using `globStatus` to obtain `fileStatus` ### Why are the changes needed? It's inefficient to obtain FileStatus for each resource [one by one](https://github.com/apache/spark/blob/531ec8bddc8dd22ca39486dbdd31e62e989ddc15/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71C1). In our company setting, we are running Spark with Hadoop Yarn and HDFS. We noticed the current behavior has two major drawbacks: - Since each getFileStatus call involves network delays, the overall delay can be large and add uncertainty to the overall Spark job runtime. Specifically, we quantify this overhead within our cluster. We see the p50 overhead is around 10s, p80 is 1 min, and p100 is up to 15 mins. When HDFS is overloaded, the delays become more severe. - In our cluster, we have nearly 100 million getFileStatus call to HDFS daily. We noticed that in our cluster, most resources come from the same HDFS directory for each user (See our [engineer blog post](https://engineering.linkedin.com/blog/2023/reducing-apache-spark-application-dependencies-upload-by-99-) about why we took this approach). Therefore, we can greatly reduce nearly 100 million getFileStatus call to 0.1 million `listStatus` calls daily. This will further reduce overhead [...] All in all, a more efficient way to fetch the FileStatus for each resource is highly needed. ### Does this PR introduce _any_ user-facing change? We introduce two configurations so that the user can turn on this feature 1. `spark.yarn.client.statCache.preloaded.enabled`: This configuration enables statCache to be preloaded at YARN client side. This feature analyzes the pattern of resource paths. If multiple resources shared the same parent directory, a single `listStatus` will be invoked on the parent directory instead of multiple `getFileStatus` performed on each individual resource. If most resources are from the same directory, this feature greatly reduces the RPC call, reduces job runtime, and im [...] 2. `spark.yarn.client.statCache.preloaded.perDirectoryThreshold`: This configuration defines the threshold for the number of resources in a directory that triggers the activation of the statCache preloading. When the count of individual resources specified by `spark.yarn.jars` within a directory is no less than this threshold, the statCache preloading for that directory will be activated. It's" important to note that this configuration will only take effect when the above option is enabled. ### How was this patch tested? 1. Add two related UTs Closes #42357 from shuwang21/SPARK-44306-listStatus. Lead-authored-by: Shu Wang <swa...@linkedin.com> Co-authored-by: Shu Wang <wangshu1...@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../org/apache/spark/deploy/yarn/Client.scala | 78 +++++++++++++++++++--- .../org/apache/spark/deploy/yarn/config.scala | 23 +++++++ .../org/apache/spark/deploy/yarn/ClientSuite.scala | 38 ++++++++++- 3 files changed, 129 insertions(+), 10 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a675054b4473..deb44c3b5911 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -77,6 +77,9 @@ private[spark] class Client( private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster" private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && !isClusterMode + private val statCachePreloadEnabled = sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED) + private val statCachePreloadDirectoryCountThreshold: Int = + sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD) private var appMaster: ApplicationMaster = _ private var stagingDirPath: Path = _ @@ -458,6 +461,49 @@ private[spark] class Client( new Path(resolvedDestDir, qualifiedDestPath.getName()) } + /** + * For each non-local and non-glob resource, we will count its parent directory. If its + * frequency is larger than the threshold specified by + * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the corresponding file status + * from the directory will be preloaded. + * + * @param files : the list of files to upload + * @return a hashmap contains directories to be preloaded and all file names in that directory + */ + private[yarn] def directoriesToBePreloaded(files: Seq[String]): HashMap[URI, HashSet[String]] = { + val directoryToFiles = new HashMap[URI, HashSet[String]]() + files.foreach { file => + if (!Utils.isLocalUri(file) && !new GlobPattern(file).hasWildcard) { + val currentPath = new Path(Utils.resolveURI(file)) + val parentUri = currentPath.getParent.toUri + directoryToFiles.getOrElseUpdate(parentUri, new HashSet[String]()) += currentPath.getName + } + } + directoryToFiles.filter(_._2.size >= statCachePreloadDirectoryCountThreshold) + } + + /** + * Preload the statCache with file status. List all files from that directory and add them to the + * statCache. + * + * @param files: the list of files to upload + * @param fsLookup: Function for looking up an FS based on a URI; override for testing + * @return A preloaded statCache with fileStatus + */ + private[yarn] def getPreloadedStatCache(files: Seq[String], + fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): HashMap[URI, FileStatus] = { + val statCache = HashMap[URI, FileStatus]() + directoriesToBePreloaded(files).foreach { case (dir: URI, filesInDir: HashSet[String]) => + fsLookup(dir).listStatus(new Path(dir), new PathFilter() { + override def accept(path: Path): Boolean = filesInDir.contains(path.getName) + }).filter(_.isFile()).foreach { fileStatus => + val uri = fileStatus.getPath.toUri + statCache.put(uri, fileStatus) + } + } + statCache + } + /** * Upload any resources to the distributed cache if needed. If a resource is intended to be * consumed locally, set up the appropriate config for downstream code to handle it properly. @@ -485,7 +531,17 @@ private[spark] class Client( val localResources = HashMap[String, LocalResource]() FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION)) - val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + // If preload is enabled, preload the statCache with the files in the directories + val statCache = if (statCachePreloadEnabled) { + // Consider only following configurations, as they involve the distribution of multiple files + var files = sparkConf.get(SPARK_JARS).getOrElse(Nil) ++ sparkConf.get(JARS_TO_DISTRIBUTE) ++ + sparkConf.get(FILES_TO_DISTRIBUTE) ++ sparkConf.get(ARCHIVES_TO_DISTRIBUTE) ++ + sparkConf.get(PY_FILES) ++ pySparkArchives + + getPreloadedStatCache(files) + } else { + HashMap[URI, FileStatus]() + } val symlinkCache: Map[URI, Path] = HashMap[URI, Path]() def addDistributedUri(uri: URI): Boolean = { @@ -609,14 +665,18 @@ private[spark] class Client( if (!Utils.isLocalUri(jar)) { val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf) val pathFs = FileSystem.get(path.toUri(), hadoopConf) - val fss = pathFs.globStatus(path) - if (fss == null) { - throw new FileNotFoundException(s"Path ${path.toString} does not exist") - } - fss.filter(_.isFile()).foreach { entry => - val uri = entry.getPath().toUri() - statCache.update(uri, entry) - distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR)) + if (statCache.contains(path.toUri)) { + distribute(path.toUri.toString, targetDir = Some(LOCALIZED_LIB_DIR)) + } else { + val fss = pathFs.globStatus(path) + if (fss == null) { + throw new FileNotFoundException(s"Path ${path.toString} does not exist") + } + fss.filter(_.isFile()).foreach { entry => + val uri = entry.getPath().toUri() + statCache.update(uri, entry) + distribute(uri.toString(), targetDir = Some(LOCALIZED_LIB_DIR)) + } } } else { localJars += jar diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 96ebc03bf163..c11961008019 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -462,6 +462,29 @@ package object config extends Logging { .stringConf .createWithDefault("yarn.io/fpga") + private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED = + ConfigBuilder("spark.yarn.client.statCache.preload.enabled") + .doc("Enables statCache to be preloaded at YARN client side. This feature analyzes the " + + "pattern of resources paths, and if multiple resources shared the same parent directory, " + + "a single <code>listStatus</code> will be invoked on the parent directory instead of " + + "multiple <code>getFileStatus</code> on individual resources. If most resources are from " + + "a small set of directories, this can improve job submission time. Enabling this feature " + + "may potentially increase client memory overhead.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + + private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD = + ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold") + .doc("Minimum resource count in a directory to trigger statCache preloading when " + + "submitting an application. If the number of resources in a directory, without " + + "any wildcards, equals or exceeds this threshold, the statCache for that directory " + + "will be preloaded. This configuration will only take effect when " + + "<code>spark.yarn.client.statCache.preloaded.enabled</code> option is enabled.") + .version("4.0.0") + .intConf + .createWithDefault(5) + private[yarn] val YARN_EXECUTOR_RESOURCE_TYPES_PREFIX = "spark.yarn.executor.resource." private[yarn] val YARN_DRIVER_RESOURCE_TYPES_PREFIX = "spark.yarn.driver.resource." private[yarn] val YARN_AM_RESOURCE_TYPES_PREFIX = "spark.yarn.am.resource." diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 8802c59e78b6..ce4389fd268d 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap => MutableHashMap} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter} import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords.{GetNewApplicationResponse, SubmitApplicationRequest} @@ -670,6 +670,42 @@ class ClientSuite extends SparkFunSuite assertUserClasspathUrls(cluster = true, replacementRootPath) } + test("SPARK-44306: test directoriesToBePreloaded") { + val sparkConf = new SparkConf() + .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 3) + val client = createClient(sparkConf, args = Array("--jar", USER)) + val directories = client.directoriesToBePreloaded(Seq( + "hdfs:/valid/a.jar", + "hdfs:/valid/b.jar", + "hdfs:/valid/c.jar", + "s3:/valid/a.jar", + "s3:/valid/b.jar", + "s3:/valid/c.jar", + "hdfs:/glob/*", + "hdfs:/fewer/a.jar", + "hdfs:/fewer/b.jar", + "local:/local/a.jar", + "local:/local/b.jar", + "local:/local/c.jar")) + assert(directories.size == 2 && directories.contains(new URI("hdfs:/valid")) + && directories.contains(new URI("s3:/valid"))) + } + + test("SPARK-44306: test statCachePreload") { + val sparkConf = new SparkConf() + .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 2) + .set(JARS_TO_DISTRIBUTE, Seq("hdfs:/valid/a.jar", "hdfs:/valid/sub/../b.jar")) + val client = createClient(sparkConf, args = Array("--jar", USER)) + val mockFileSystem = mock(classOf[FileSystem]) + val mockFsLookup: URI => FileSystem = _ => mockFileSystem + when(mockFileSystem.listStatus(any[Path], any[PathFilter])).thenReturn(Seq( + new FileStatus(1, false, 1, 1, 1L, new Path("hdfs:/valid/a.jar")), + new FileStatus(1, false, 1, 1, 1L, new Path("hdfs:/valid/b.jar")), + new FileStatus(1, true, 1, 1, 1L, new Path("hdfs:/valid/c"))).toArray) + // Expect only a.jar and b.jar to be preloaded + assert(client.getPreloadedStatCache(sparkConf.get(JARS_TO_DISTRIBUTE), mockFsLookup).size === 2) + } + private val matching = Seq( ("files URI match test1", "file:///file1", "file:///file2"), ("files URI match test2", "file:///c:file1", "file://c:file2"), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org