venkata91 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292662552


##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared 
the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the 
parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each 
individual resources." +
+      " If most resources from a small set of directories, this can 
substantially improve job" +

Review Comment:
   nit: `most resources from a small` -> `most resources are from a small`



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared 
the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the 
parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each 
individual resources." +
+      " If most resources from a small set of directories, this can 
substantially improve job" +
+      " submission time. Noticing, this could potentially increase the memory 
overhead at client" +

Review Comment:
   `Noticing, this could potentially increase the memory overhead at client` -> 
`Note: Enabling this feature may potentially increase client memory overhead.`



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -485,7 +534,12 @@ private[spark] class Client(
     val localResources = HashMap[String, LocalResource]()
     FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
 
-    val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+    // If preload is enabled, preload the statCache with the files in the 
directories
+    val statCache = if (statCachePreloadEnabled) {
+      getPreloadedStatCache()
+    } else {
+      HashMap[URI, FileStatus]()

Review Comment:
   nit: `Map.empty[URI, FileStatus]`?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its

Review Comment:
   `Count the number of resources in each directory and if it is greater than 
spark.yarn.client.statCache.preloaded.perDirectoryThreshold value, all the 
resources in that directory will be preloaded`?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature" +
+      " analyzes the pattern of resources paths. If multiple resources shared 
the same parent" +
+      " directory, a single <code>listStatus</code> will be invoked on the 
parent directory" +
+      " instead of multiple <code>getFileStatus</code> performed on each 
individual resources." +
+      " If most resources from a small set of directories, this can 
substantially improve job" +
+      " submission time. Noticing, this could potentially increase the memory 
overhead at client" +
+      " side.")
+    .version("4.0.0")
+    .booleanConf
+    .createWithDefault(false)
+
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+    ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+      .doc("This configuration defines the threshold for the number of 
resources in a directory" +

Review Comment:
   How about ```This configuration sets the resource count threshold that 
triggers statCache preloading. If the count of resources specified by 
<code>spark.yarn.jars</code> within a directory is equal to or greater than 
this threshold, the statCache preloading for that directory will activate. Note 
that this configuration only works when 
<code>spark.yarn.client.statCache.preloaded.enabled</code> is enabled.```?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
     .stringConf
     .createWithDefault("yarn.io/fpga")
 
+  private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+    ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+    .doc("This configuration enables statCache to be preloaded at YARN client 
side. This feature" +

Review Comment:
   May be, ```Enabling this configuration preloads statCache at the YARN client 
side. It analyzes resource path patterns and invokes a single 
`fileSystem.listStatus` on a parent directory if multiple resources share the 
same directory. This can significantly improve job performance if most 
resources come from a small set of directories.```?



##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
     new Path(resolvedDestDir, qualifiedDestPath.getName())
   }
 
+  /**
+   * For each non-local and non-glob resource, we will count its parent 
directory. If its
+   * frequency is larger than the threshold specified by
+   * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file 
status from the
+   * directory will be preloaded.
+   *
+   * @param jars : the list of jars to upload
+   * @return a list of directories to be preloaded
+   * */
+  private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+    val directoryCounter = new HashMap[URI, Int]()
+    jars.foreach { jar =>
+      if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+        val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+        directoryCounter.update(parentUri, 
directoryCounter.getOrElse(parentUri, 0) + 1)
+      }
+    }
+    directoryCounter.filter(_._2 >= 
statCachePreloadDirectoryCountThreshold).keys.toList
+  }
+
+  /**
+   * Preload the statCache with file status. For each directory in the list, 
we will list all

Review Comment:
   Instead of `we will list all files` -> `list all files`?



##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -666,6 +666,42 @@ class ClientSuite extends SparkFunSuite with Matchers {
     assertUserClasspathUrls(cluster = true, replacementRootPath)
   }
 
+  test("SPARK-44306: test directoriesToBePreloaded") {
+    val sparkConf = new SparkConf()
+      .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 3)
+    val client = createClient(sparkConf, args = Array("--jar", USER))
+    val directories = client.directoriesToBePreloaded(Seq(
+      "hdfs:/valid/a.jar",
+      "hdfs:/valid/b.jar",
+      "hdfs:/valid/c.jar",
+      "s3:/valid/a.jar",
+      "s3:/valid/b.jar",
+      "s3:/valid/c.jar",
+      "hdfs:/glob/*",
+      "hdfs:/fewer/a.jar",
+      "hdfs:/fewer/b.jar",
+      "local:/local/a.jar",
+      "local:/local/b.jar",
+      "local:/local/c.jar"))
+    assert(directories.size == 2 && directories.contains(new 
URI("hdfs:/valid"))
+      && directories.contains(new URI("s3:/valid")))

Review Comment:
   nit: `!directories.contains(new URI("local:/local"))` - check local dirs are 
not added to `directories`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to