shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295734605
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the
corresponding file status
+ * from the directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a hashmap contains directories to be preloaded and all file names
in that directory
+ * */
+ private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI,
Set[String]] = {
+ val directoryToFiles = new HashMap[URI, Set[String]]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val currentUri = Utils.resolveURI(jar)
+ val parentUri = new Path(currentUri).getParent.toUri
+ directoryToFiles.update(parentUri,
directoryToFiles.getOrElse(parentUri, Set.empty)
+ .union(Set(currentUri.normalize().toString)))
+ }
+ }
+ directoryToFiles.filter(_._2.size >=
statCachePreloadDirectoryCountThreshold)
+ }
+
+ /**
+ * Preload the statCache with file status. List all files from that
directory and add them to the
+ * statCache.
+ *
+ * @param fsLookup: Function for looking up an FS based on a URI; override
for testing
+ * @return A preloaded statCache with fileStatus
+ */
+ private[yarn] def getPreloadedStatCache(
+ fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)):
HashMap[URI, FileStatus] = {
+ val statCache = HashMap[URI, FileStatus]()
+ val jars = sparkConf.get(SPARK_JARS)
+ val directoryToFiles =
jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+ directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+ fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+ filter(f => filesInDir.contains(f.getPath.toString)).foreach {
fileStatus =>
+ val uri = fileStatus.getPath.toUri
+ statCache.put(uri, fileStatus)
Review Comment:
We need file status from the parent directory to the root directory. I am
checking whether `listStatus` also return the current directory, but it seems
that it does not. This means we have to use getFileStatus to get the fileStatus
for the parent directory.
We can also defer this operation, with this
[PR](https://github.com/apache/spark/pull/41821), the parent directory will be
cached within `ancestorsHaveExecutePermissions` if not yet in `statCache`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]