mridulm commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1295351248


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the 
corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names 
in that directory
+   * */

Review Comment:
   nit:
   ```suggestion
      */
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the 
corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names 
in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, 
Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, 
directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))

Review Comment:
   It would be simpler to do something like:
   
   ```suggestion
           val currentPath = new Path(Utils.resolveURI(jar))
           val parentUri = currentPath.getParent.toUri
           directoryToFiles.getOrElseUpdate(parentUri, new HashSet[String]()) 
+= currentPath.getName
   ```
   
   



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared 
the same parent" +

Review Comment:
   nit:
   ```suggestion
         " analyzes the pattern of resources paths, and if multiple resources 
shared the same parent" +
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the 
corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names 
in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, 
Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, 
directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= 
statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that 
directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override 
for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): 
HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = 
jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { 
fileStatus =>

Review Comment:
   This would become:
   
   ```suggestion
           filter(f => filesInDir.contains(f.getPath.getName)).foreach { 
fileStatus =>
   ```
   
   once we make the change above.



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,30 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared 
the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the 
parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each 
individual resources." +
+      " If most resources from a small set of directories, this can 
substantially improve job" +
+      " submission time. Enabling this feature may potentially increase client 
memory overhead.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of 
resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the 
count of individual" +

Review Comment:
   nit: For both the doc notes, let us be consistent with the convention in the 
other doc notes in this file w.r.t starting space.
   For example:
   ```suggestion
         .doc("This configuration defines the threshold for the number of 
resources in a directory " +
           "that triggers the activation of the statCache preloading. When the 
count of individual " +
           <snip>
   ```



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, the 
corresponding file status
+   * from the directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a hashmap contains directories to be preloaded and all file names 
in that directory
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): HashMap[URI, 
Set[String]] = {
+    val directoryToFiles = new HashMap[URI, Set[String]]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val currentUri = Utils.resolveURI(jar)
+        val parentUri = new Path(currentUri).getParent.toUri
+        directoryToFiles.update(parentUri, 
directoryToFiles.getOrElse(parentUri, Set.empty)
+          .union(Set(currentUri.normalize().toString)))
+      }
+    }
+    directoryToFiles.filter(_._2.size >= 
statCachePreloadDirectoryCountThreshold)
+  }
+
+  /**
+   * Preload the statCache with file status. List all files from that 
directory and add them to the
+   * statCache.
+   *
+   * @param fsLookup: Function for looking up an FS based on a URI; override 
for testing
+   * @return A preloaded statCache with fileStatus
+   */
+  private[yarn] def getPreloadedStatCache(
+      fsLookup: URI => FileSystem = FileSystem.get(_, hadoopConf)): 
HashMap[URI, FileStatus] = {
+    val statCache = HashMap[URI, FileStatus]()
+    val jars = sparkConf.get(SPARK_JARS)
+    val directoryToFiles = 
jars.map(directoriesToBePreloaded).getOrElse(HashMap.empty)
+
+    directoryToFiles.foreach { case (dir: URI, filesInDir: Set[String]) =>
+      fsLookup(dir).listStatus(new Path(dir)).filter(_.isFile()).
+        filter(f => filesInDir.contains(f.getPath.toString)).foreach { 
fileStatus =>
+          val uri = fileStatus.getPath.toUri
+          statCache.put(uri, fileStatus)

Review Comment:
   QQ: Since we are loading the `FileStatus` for the jar, dont we also need the 
parent's `FileStatus` ? Cache that as well ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to