shuwang21 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295734605


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the 
corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names 
in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, 
Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, 
directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= 
statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that 
directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override 
for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): 
HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = 
jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { 
fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          statCache.put(uri, fileStatus)

Review Comment:
   We need file status from the parent directory to the root directory. I am 
checking whether `listStatus` also return the current directory, but it seems 
that it does not. This means we have to use getFileStatus to get the fileStatus 
for the parent directory. 
   
   We can also defer this operation, with this 
[PR](https://github.com/apache/spark/pull/41821), the parent directory will be 
cached within `ancestorsHaveExecutePermissions` if not yet in `statCache`. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to