xkrogen commented on code in PR #42357:
URL: https://github.com/apache/spark/pull/42357#discussion_r1290346200
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val parentUri = path.getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.collect {
+ case (uri, count) if count >= perDirectoryThreshold => uri
+ }
+ }
+
+ /**
+ * Preload the statCache with file status. For each directory in the list,
we will list all
+ * files from that directory and add them to the statCache.
+ *
+ * @param fs : the target file system
+ * @param statCache : stat cache to be preloaded with fileStatus
+ */
+ def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit
= {
+ logDebug("Preload the following directories")
+ val directory = sparkConf.get(SPARK_JARS)
+ .map(directoriesToBePreloaded)
+ .getOrElse(ArrayBuffer.empty[URI])
+
+ directory.foreach { dir =>
+ fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>
Review Comment:
shouldn't we filter to only files in `sparkConf.get(SPARK_JARS)`? Envision
we have directory `/foo` with three files `a` `b` `c`, and `spark.jars = a,b`.
With the current code, won't we now also load `c`?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val parentUri = path.getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.collect {
+ case (uri, count) if count >= perDirectoryThreshold => uri
+ }
Review Comment:
minor nit: you can make this a little more concise like:
```suggestion
directoryCounter.filter(_._2 >= perDirectoryThreshold).keys
```
(feel free to ignore if you feel readability is negatively impacted)
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val parentUri = path.getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.collect {
+ case (uri, count) if count >= perDirectoryThreshold => uri
+ }
+ }
+
+ /**
+ * Preload the statCache with file status. For each directory in the list,
we will list all
+ * files from that directory and add them to the statCache.
+ *
+ * @param fs : the target file system
+ * @param statCache : stat cache to be preloaded with fileStatus
+ */
+ def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit
= {
+ logDebug("Preload the following directories")
Review Comment:
Did you mean to log some directory names here?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val parentUri = path.getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.collect {
+ case (uri, count) if count >= perDirectoryThreshold => uri
+ }
+ }
+
+ /**
+ * Preload the statCache with file status. For each directory in the list,
we will list all
+ * files from that directory and add them to the statCache.
+ *
+ * @param fs : the target file system
+ * @param statCache : stat cache to be preloaded with fileStatus
+ */
+ def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit
= {
+ logDebug("Preload the following directories")
+ val directory = sparkConf.get(SPARK_JARS)
Review Comment:
`directories`? this is a list, not a single dir, right?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val parentUri = path.getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.collect {
+ case (uri, count) if count >= perDirectoryThreshold => uri
+ }
+ }
+
+ /**
+ * Preload the statCache with file status. For each directory in the list,
we will list all
+ * files from that directory and add them to the statCache.
+ *
+ * @param fs : the target file system
+ * @param statCache : stat cache to be preloaded with fileStatus
+ */
+ def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit
= {
+ logDebug("Preload the following directories")
+ val directory = sparkConf.get(SPARK_JARS)
+ .map(directoriesToBePreloaded)
+ .getOrElse(ArrayBuffer.empty[URI])
+
+ directory.foreach { dir =>
+ fs.listStatus(new Path(dir)).filter(_.isFile()).foreach { fileStatus =>
Review Comment:
You can't assume that it's the same FS for all of the directories. For
example we could have:
```
spark.jars =
hdfs:///foo/a.jar,hdfs:///foo/b.jar,s3:///bar/c.jar,s3:///bar/d.jar
```
Now our preload directories would be `hdfs:///foo` and `s3:///bar` -- two
different filesystems. You have to load the correct one based on the URI scheme
`fs` is the _destination_ FS for uploads, which will always be the same, but
here we are loading from the _source_ FS, which can be different for each
JAR/dir.
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
Review Comment:
Why `getQualifiedLocalPath()`? In this block we already know
`!isLocalUri()`, so it's not local. Can't we just do `val parentUri = new
Path(Utils.resolveURI(jar)).getParent.toUri` ?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
+ val directoryCounter = new HashMap[URI, Int]()
+ jars.foreach { jar =>
+ if (!Utils.isLocalUri(jar) && !new GlobPattern(jar).hasWildcard) {
+ val path = getQualifiedLocalPath(Utils.resolveURI(jar), hadoopConf)
+ val parentUri = path.getParent.toUri
+ directoryCounter.update(parentUri,
directoryCounter.getOrElse(parentUri, 0) + 1)
+ }
+ }
+ directoryCounter.collect {
+ case (uri, count) if count >= perDirectoryThreshold => uri
+ }
+ }
+
+ /**
+ * Preload the statCache with file status. For each directory in the list,
we will list all
+ * files from that directory and add them to the statCache.
+ *
+ * @param fs : the target file system
+ * @param statCache : stat cache to be preloaded with fileStatus
+ */
+ def statCachePreload(fs: FileSystem, statCache: Map[URI, FileStatus]): Unit
= {
Review Comment:
`private[yarn]` ?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature " +
+ "analyzes the pattern of resources paths. If multiple resources shared
the same parent " +
+ "directory, a single <code>listStatus</code> will be invoked on the
parent directory " +
+ "instead of multiple <code>getFileStatus</code> performed on each
individual resources. " +
+ "If most resources are from the same directory, this feature greatly
reduces the RPC call, " +
+ "reduce job runtime, and improve the job runtime variation due to
network delays. " +
+ "Noticing, this could potentially increase the memory overhead at client
side.")
+ .version("3.5.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_PER_DIRECTORY_THRESHOLD =
+
ConfigBuilder("spark.yarn.client.statCache.preloaded.perDirectoryThreshold")
+ .doc("This configuration defines the threshold for the number of
resources in a directory" +
+ " that triggers the activation of the statCache preloading. When the
count of individual" +
+ " resources specified by <code>spark.yarn.jars</code> within a
directory is no less than" +
+ " this threshold, the statCache preloading for that directory will be
activated. It's" +
+ " important to note that this configuration will only take effect when
the" +
+ " <code>spark.yarn.client.statCache.preloaded.enabled</code> option is
enabled.")
+ .version("3.5.0")
+ .longConf
+ .createWithDefault(0L)
Review Comment:
Let's set a sensible default here? 0 means that even if every JAR is in a
different directory, we'll load each one using `listStatus`, which probably
isn't what we want. I would say something like 5 or 10 is probably reasonable
as a starting point?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature " +
+ "analyzes the pattern of resources paths. If multiple resources shared
the same parent " +
+ "directory, a single <code>listStatus</code> will be invoked on the
parent directory " +
+ "instead of multiple <code>getFileStatus</code> performed on each
individual resources. " +
+ "If most resources are from the same directory, this feature greatly
reduces the RPC call, " +
+ "reduce job runtime, and improve the job runtime variation due to
network delays. " +
Review Comment:
We won't actually decrease the runtime of the job itself, just of the
submission/initialization of the job. Generally I think we should be careful
about making too strong of a claim. How about "If most resources from a small
set of directories, this can substantially improve job submission time."
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -77,6 +78,9 @@ private[spark] class Client(
private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) &&
!isClusterMode
+ private val statCachePreloadedEnabled =
sparkConf.get(YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED)
+ private val perDirectoryThreshold =
Review Comment:
`statCachePreloadDirectoryCountThreshold` or something similar? I think we
need ` statCachePreload` prefix of some sort
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala:
##########
@@ -462,6 +462,31 @@ package object config extends Logging {
.stringConf
.createWithDefault("yarn.io/fpga")
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_ENABLED =
+ ConfigBuilder("spark.yarn.client.statCache.preloaded.enabled")
+ .doc("This configuration enables statCache to be preloaded at YARN client
side. This feature " +
+ "analyzes the pattern of resources paths. If multiple resources shared
the same parent " +
+ "directory, a single <code>listStatus</code> will be invoked on the
parent directory " +
+ "instead of multiple <code>getFileStatus</code> performed on each
individual resources. " +
+ "If most resources are from the same directory, this feature greatly
reduces the RPC call, " +
+ "reduce job runtime, and improve the job runtime variation due to
network delays. " +
+ "Noticing, this could potentially increase the memory overhead at client
side.")
+ .version("3.5.0")
+ .booleanConf
+ .createWithDefault(false)
+
+ private[spark] val YARN_CLIENT_STAT_CACHE_PRELOADED_PER_DIRECTORY_THRESHOLD =
+
ConfigBuilder("spark.yarn.client.statCache.preloaded.perDirectoryThreshold")
+ .doc("This configuration defines the threshold for the number of
resources in a directory" +
+ " that triggers the activation of the statCache preloading. When the
count of individual" +
+ " resources specified by <code>spark.yarn.jars</code> within a
directory is no less than" +
+ " this threshold, the statCache preloading for that directory will be
activated. It's" +
+ " important to note that this configuration will only take effect when
the" +
+ " <code>spark.yarn.client.statCache.preloaded.enabled</code> option is
enabled.")
+ .version("3.5.0")
+ .longConf
Review Comment:
int is probably sufficient? long would overflow our in-memory buffers anyway
:)
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -458,6 +462,51 @@ private[spark] class Client(
new Path(resolvedDestDir, qualifiedDestPath.getName())
}
+ /**
+ * For each non-local and non-glob resource, we will count its parent
directory. If its
+ * frequency is larger than the threshold specified by
+ * spark.yarn.client.statCache.preloaded.perDirectoryThreshold, all the file
status from the
+ * directory will be preloaded.
+ *
+ * @param jars : the list of jars to upload
+ * @return a list of directories to be preloaded
+ * */
+ def directoriesToBePreloaded(jars: Seq[String]): mutable.Iterable[URI] = {
Review Comment:
Maybe return a `Seq` or `List`? Even your Scaladoc says you're returning a
list :) `Iterable` has some caveats like potentially only being iterable a
single time, I generally wouldn't recommend passing it around unless you have a
need to (e.g. you're generating elements on the fly and don't want to
materialize the whole thing)
also, this should be `private[yarn]` ?
##########
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala:
##########
@@ -583,6 +632,9 @@ private[spark] class Client(
case _ => None
}
+ // If preload is enabled, preload the statCache with the files in the
directories
+ if (statCachePreloadedEnabled) { statCachePreload(fs, statCache) }
Review Comment:
The control flow is a little confusing to me, where we initialize
`statCache` up on L537, then access it while processing the Ivy settings and AM
keytab file, _then_ populate it down here. How about we preload it when we
initialize?
```scala
val statCache = if (statCachePreloadEnabled) {
getPreloadedStatCache(fs)
} else {
HashMap[URI, FileStatus]()
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]