venkata91 commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1292662552
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature" +
+ " analyzes the pattern of resources paths. If multiple resources shared
the same parent" +
+ " directory, a single <code>listStatus</code> will be invoked on the
parent directory" +
+ " instead of multiple <code>getFileStatus</code> performed on each
individual resources." +
+ " If most resources from a small set of directories, this can
substantially improve job" +
Review Comment:
nit: `most resources from a small` -> `most resources are from a small`
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature" +
+ " analyzes the pattern of resources paths. If multiple resources shared
the same parent" +
+ " directory, a single <code>listStatus</code> will be invoked on the
parent directory" +
+ " instead of multiple <code>getFileStatus</code> performed on each
individual resources." +
+ " If most resources from a small set of directories, this can
substantially improve job" +
+ " submission time. Noticing, this could potentially increase the memory
overhead at client" +
Review Comment:
`Noticing, this could potentially increase the memory overhead at client` ->
`Note: Enabling this feature may potentially increase client memory overhead.`
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -485,7 +534,12 @@ private[spark] class Client(
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
- val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
+ // If preload is enabled, preload the statCache with the files in the
directories
+ val statCache = if (statCachePreloadEnabled) {
+ getPreloadedStatCache()
+ } else {
+ HashMap[URI, FileStatus]()
Review Comment:
nit: `Map.empty[URI, FileStatus]`?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
Review Comment:
`Count the number of resources in each directory and if it is greater than
spark.yarn.client.statCache.preloaded.perDirectoryThreshold value, all the
resources in that directory will be preloaded`?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature" +
+ " analyzes the pattern of resources paths. If multiple resources shared
the same parent" +
+ " directory, a single <code>listStatus</code> will be invoked on the
parent directory" +
+ " instead of multiple <code>getFileStatus</code> performed on each
individual resources." +
+ " If most resources from a small set of directories, this can
substantially improve job" +
+ " submission time. Noticing, this could potentially increase the memory
overhead at client" +
+ " side.")
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD =
+ ConfigBuilder("spark.yarn.client.statCache.preload.perDirectoryThreshold")
+ .doc("This configuration defines the threshold for the number of
resources in a directory" +
Review Comment:
How about ```This configuration sets the resource count threshold that
triggers statCache preloading. If the count of resources specified by
<code>spark.yarn.jars</code> within a directory is equal to or greater than
this threshold, the statCache preloading for that directory will activate. Note
that this configuration only works when
<code>spark.yarn.client.statCache.preloaded.enabled</code> is enabled.```?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOAD_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preload.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature" +
Review Comment:
May be, ```Enabling this configuration preloads statCache at the YARN client
side. It analyzes resource path patterns and invokes a single
`fileSystem.listStatus` on a parent directory if multiple resources share the
same directory. This can significantly improve job performance if most
resources come from a small set of directories.```?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +461,52 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ private[yarn] def directoriesToBePreloaded(jars: Seq[String]): List[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val parentUri = new Path(Utils.resolveURI(jar)).getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.filter(_._2 >=
statCachePreloadDirectoryCountThreshold).keys.toList
+ }
+
+ /**
+ * Preload the statCache with file status. For each directory in the list,
we will list all
Review Comment:
Instead of `we will list all files` -> `list all files`?
##########
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala:
##########
@@ -666,6 +666,42 @@ class ClientSuite extends SparkFunSuite with Matchers {
assertUserClasspathUrls(cluster = true, replacementRootPath)
}
+ test("SPARK-44306: test directoriesToBePreloaded") {
+ val sparkConf = new SparkConf()
+ .set(YARN_CLIENT_STAT_CACHE_PRELOAD_PER_DIRECTORY_THRESHOLD, 3)
+ val client = createClient(sparkConf, args = Array("--jar", USER))
+ val directories = client.directoriesToBePreloaded(Seq(
+ "hdfs:/valid/a.jar",
+ "hdfs:/valid/b.jar",
+ "hdfs:/valid/c.jar",
+ "s3:/valid/a.jar",
+ "s3:/valid/b.jar",
+ "s3:/valid/c.jar",
+ "hdfs:/glob/*",
+ "hdfs:/fewer/a.jar",
+ "hdfs:/fewer/b.jar",
+ "local:/local/a.jar",
+ "local:/local/b.jar",
+ "local:/local/c.jar"))
+ assert(directories.size == 2 && directories.contains(new
URI("hdfs:/valid"))
+ && directories.contains(new URI("s3:/valid")))
Review Comment:
nit: `!directories.contains(new URI("local:/local"))` - check local dirs are
not added to `directories`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]