xkrogen commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1290346200


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit 
= {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)
+      .map(directoriesToBePreloaded)
+      .getOrElse(ArrayBuffer.empty[URI])
+
+    directory.foreach { dir =>
+      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>

Review Comment:
   shouldn't we filter to only files in `sparkConf.get(SPARK_JARS)`? Envision 
we have directory `/foo` with three files `a` `b` `c`, and `spark.jars = a,b`. 
With the current code, won't we now also load `c`?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }

Review Comment:
   minor nit: you can make this a little more concise like:
   ```suggestion
       directoryCounter.filter(_._2 >= perDirectoryThreshold).keys
   ```
   (feel free to ignore if you feel readability is negatively impacted)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit 
= {
+    logDebug("Preload the following directories")

Review Comment:
   Did you mean to log some directory names here?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit 
= {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)

Review Comment:
   `directories`? this is a list, not a single dir, right?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit 
= {
+    logDebug("Preload the following directories")
+    val directory = sparkConf.get(SPARK_JARS)
+      .map(directoriesToBePreloaded)
+      .getOrElse(ArrayBuffer.empty[URI])
+
+    directory.foreach { dir =>
+      fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>

Review Comment:
   You can't assume that it's the same FS for all of the directories. For 
example we could have:
   ```
   spark.jars = 
hdfs:///foo/a.jar,hdfs:///foo/b.jar,s3:///bar/c.jar,s3:///bar/d.jar
   ```
   Now our preload directories would be `hdfs:///foo` and `s3:///bar` -- two 
different filesystems. You have to load the correct one based on the URI scheme
   
   `fs` is the _destination_ FS for uploads, which will always be the same, but 
here we are loading from the _source_ FS, which can be different for each 
JAR/dir.



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)

Review Comment:
   Why `getQualifiedLocalPath()`? In this block we already know 
`!isLocalUri()`, so it's not local. Can't we just do `val parentUri = new 
Path(Utils.resolveURI(jar)).getParent.toUri` ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+        val parentUri = path.getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.collect {
+      case (uri, count) if count >= perDirectoryThreshold => uri
+    }
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all
+   * files from that directory and add them to the statCache.
+   *
+   * @param fs : the target file system
+   * @param statCache : stat cache to be preloaded with fileStatus
+   */
+  def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit 
= {

Review Comment:
   `private[yarn]` ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature " +
+      "analyzes the pattern of resources paths. If multiple resources shared 
the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the 
parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each 
individual resources. " +
+      "If most resources are from the same directory, this feature greatly 
reduces the RPC call, " +
+      "reduce job runtime, and improve the job runtime variation due to 
network delays. " +
+      "Noticing, this could potentially increase the memory overhead at client 
side.")
+    .version("3.5.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_PER_DIRECTORY_THRESHOLD =
+    
ConfigBuilder("spark.yarn.client.statCache.preloaded.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of 
resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the 
count of individual" +
+        " resources specified by <code>spark.yarn.jars</code> within a 
directory is no less than" +
+        " this threshold, the statCache preloading for that directory will be 
activated. It's" +
+        " important to note that this configuration will only take effect when 
the" +
+        " <code>spark.yarn.client.statCache.preloaded.enabled</code> option is 
enabled.")
+      .version("3.5.0")
+      .longConf
+      .createWithDefault(0L)

Review Comment:
   Let's set a sensible default here? 0 means that even if every JAR is in a 
different directory, we'll load each one using `listStatus`, which probably 
isn't what we want. I would say something like 5 or 10 is probably reasonable 
as a starting point?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature " +
+      "analyzes the pattern of resources paths. If multiple resources shared 
the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the 
parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each 
individual resources. " +
+      "If most resources are from the same directory, this feature greatly 
reduces the RPC call, " +
+      "reduce job runtime, and improve the job runtime variation due to 
network delays. " +

Review Comment:
   We won't actually decrease the runtime of the job itself, just of the 
submission/initialization of the job. Generally I think we should be careful 
about making too strong of a claim. How about "If most resources from a small 
set of directories, this can substantially improve job submission time."



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -77,6 +78,9 @@ private[spark] class Client(
   private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
 
   private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && 
!isClusterMode
+  private val statCachePreloadedEnabled = 
sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED)
+  private val perDirectoryThreshold =

Review Comment:
   `statCachePreloadDirectoryCountThreshold` or something similar? I think we 
need ` statCachePreload` prefix of some sort



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature " +
+      "analyzes the pattern of resources paths. If multiple resources shared 
the same parent " +
+      "directory, a single <code>listStatus</code> will be invoked on the 
parent directory " +
+      "instead of multiple <code>getFileStatus</code> performed on each 
individual resources. " +
+      "If most resources are from the same directory, this feature greatly 
reduces the RPC call, " +
+      "reduce job runtime, and improve the job runtime variation due to 
network delays. " +
+      "Noticing, this could potentially increase the memory overhead at client 
side.")
+    .version("3.5.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_PER_DIRECTORY_THRESHOLD =
+    
ConfigBuilder("spark.yarn.client.statCache.preloaded.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of 
resources in a directory" +
+        " that triggers the activation of the statCache preloading. When the 
count of individual" +
+        " resources specified by <code>spark.yarn.jars</code> within a 
directory is no less than" +
+        " this threshold, the statCache preloading for that directory will be 
activated. It's" +
+        " important to note that this configuration will only take effect when 
the" +
+        " <code>spark.yarn.client.statCache.preloaded.enabled</code> option is 
enabled.")
+      .version("3.5.0")
+      .longConf

Review Comment:
   int is probably sufficient? long would overflow our in-memory buffers anyway 
:)



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {

Review Comment:
   Maybe return a `Seq` or `List`? Even your Scaladoc says you're returning a 
list :) `Iterable` has some caveats like potentially only being iterable a 
single time, I generally wouldn't recommend passing it around unless you have a 
need to (e.g. you're generating elements on the fly and don't want to 
materialize the whole thing)
   
   also, this should be `private[yarn]` ?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -583,6 +632,9 @@ private[spark] class Client(
       case _ => None
     }
 
+    // If preload is enabled, preload the statCache with the files in the 
directories
+    if (statCachePreloadedEnabled) { statCachePreload(fs, statCache) }

Review Comment:
   The control flow is a little confusing to me, where we initialize 
`statCache` up on L537, then access it while processing the Ivy settings and AM 
keytab file, _then_ populate it down here. How about we preload it when we 
initialize?
   ```scala
   val statCache = if (statCachePreloadEnabled) {
     getPreloadedStatCache(fs)
   } else {
     HashMap[URI, FileStatus]()
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to