mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1318073365


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the 
corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names 
in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, 
Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, 
directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= 
statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that 
directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override 
for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): 
HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = 
jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { 
fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          statCache.put(uri, fileStatus)

Review Comment:
   Agree, this will get handled below as well - was trying to see if we can 
short circuit it here, since we do know the parent will be required to be 
queried.
   It is fine to defer it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to